Sleeping Read-Write Locks
Posix Locks
Read-write locks are useful when a large fraction of the users of a critical section do not change any of the state protected by the lock. This allows these "readers" or "shared owners" to potentially execute in parallel. On the other hand, since the protected state also needs to change, another type of locking is required for "writers" or "exclusive owners". (Note that obviously if the data never changes, then no locking is required at all.) The complexity of having two types of ownership makes read-write locks slower than traditional mutexes. In general, only when the number of readers outnumbers the writers, is this form of locking faster.
If the length of time one holds the lock is small, then it may be worth using a spinning read-write lock. However, due to the overhead of this synchronization primitive, it is usually only worth using when the lock hold time is relatively long. If this is the case, then the waiters are better off sleeping within the kernel until the lock is released, as compared to wasting cycles spinning. On Microsoft Windows, such a locking API is implemented by the "Slim Read-write locks" (SRWLOCK ). On Linux, and other Unix-based operating systems, the pthreads pthread_rwlock_t is available.
Glibc implements these locks as follows:
typedef struct rwlock_t rwlock_t;
struct rwlock_t
{
mutex m;
unsigned wrlock;
unsigned rcount;
unsigned wwait;
unsigned rwait;
char writer_preferred;
};
int rwlock_rdlock(rwlock_t *l)
{
mutex_lock(&l->m);
/* Wait until there are no writers */
while (l->wwait || (l->wrlock && l->writer_preferred))
{
l->rwait++;
mutex_unlock(&l->m);
sys_futex(&l->rwait, FUTEX_WAIT_PRIVATE, 1, NULL, NULL, 0);
mutex_lock(&l->m);
l->rwait--;
}
/* There is a reader */
l->rcount++;
mutex_unlock(&l->m);
return 0;
}
int rwlock_wrlock(rwlock_t *l)
{
mutex_lock(&l->m);
/* Wait until there are no readers */
while (l->rcount || l->wrlock)
{
l->wwait++;
mutex_unlock(&l->m);
sys_futex(&l->wwait, FUTEX_WAIT_PRIVATE, 1, NULL, NULL, 0);
mutex_lock(&l->m);
l->wwait--;
}
/* There is a writer */
l->wrlock = 1;
mutex_unlock(&l->m);
return 0;
}
/* pthreads has merged read and write unlock */
int rwlock_unlock(rwlock_t *l)
{
mutex_lock(&l->m);
/* If there are readers, then we are read-locked */
if (l->rcount)
{
/* One less reader */
l->rcount--;
/* Still have other readers */
if (l->rcount)
{
mutex_unlock(&l->m);
return 0;
}
}
else
{
/* No longer write locked */
l->wrlock = 0;
}
mutex_unlock(&l->m);
/* Wake waiting writers first */
if (l->wwait)
{
sys_futex(&l->wwait, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
return 0;
}
/* Then wake all waiting readers */
if (l->rwait)
{
sys_futex(&l->rwait, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
}
return 0;
}
The above code is obviously not exactly how Glibc implements them as the real code is written in assembly and has different identifiers. However, the above is algorithmically how it works. Note how there is only one type of unlock routine (as mandated by the Posix specification), which determines whether to read or write unlock based on the current owner. There are also two different types of lock based on whether the writer_preferred parameter is set or not. If it is set, then waiting writers will prevent new readers from taking the lock. If it is unset, then new readers can preempt writers so long as other readers still have the lock. This gives two different sets of performance characteristics.
We benchmark the Posix read-write locks by having four threads (on this quad-core machine) randomly read and write lock the lock. They then spin-wait for a given number of cycles, and then unlock the lock. By varying the fraction of time we write-lock the lock as compared to read-lock it, we can see how this affects the speed.
The default initialization of a lock using PTHREAD_RWLOCK_INITIALIZER gives a read-preferring version:
Reader-Preferring Posix Lock |
Writers per 256 | 0 | 1 | 25 | 128 | 250 |
Time (s) | 1.1 | 2.0 | 4.4 | 6.1 | 6.4 |
Using PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP gives the writer-preferring variant:
Writer-Preferring Posix Lock |
Writers per 256 | 0 | 1 | 25 | 128 | 250 |
Time (s) | 1.1 | 1.6 | 6.7 | 10.7 | 6.8 |
Notice how the performance characteristics of the two lock types is quite different. Writer-preferring locks are better when there are mostly readers because they prevent write starvation. However, as the writer fraction rises they perform worse than their competitor, with a worse-case situation at about 50-50 readers to writers. Finally, when the number of writers dominates, the two types of lock perform comparably. (However, in this situation, you'd probably want to use a simple mutex instead.)
Futex Bitset Locks
The above code "obviously" isn't optimal. It uses a low level mutex to synchronize the internal state of the read-writer lock. If we can avoid this, then perhaps we can gain extra parallelism. For example, it may be possible for many readers to simultaneously lock at the same time instead of being synchronized. The obvious way to do this is to use the bitset capability of Futexes. This allows one to record classes of threads waiting on the same futex. By using one bit for readers, and one for writers, we can then wake the correct group when desired.
A writer-preferring version might look like:
/* Sleeping Read-Write Lock (writer preferring) */
typedef union rwlock_t rwlock_t;
union rwlock_t
{
unsigned long long ull;
unsigned u;
struct
{
unsigned char wlocked;
unsigned char wcontended;
unsigned char rlocked;
unsigned char rcontended;
} b;
};
#define RW_WLOCKED (1ULL<<0)
#define RW_WCONTEND (1ULL<<8)
#define RW_RLOCKED (1ULL<<16)
#define RW_RCONTEND (1ULL<<24)
#define RW_READS (1ULL<<32)
#define RW_WRITER 1
#define RW_READER 2
void rwlock_wlock(rwlock_t *rw)
{
while (1)
{
unsigned state = rw->u;
/* Atomic read of lock state */
barrier();
/* Try to set write-locked if there are no readers */
if (!(state & (RW_WLOCKED | RW_RLOCKED)))
{
/* If no readers, set write-locked bit */
if (cmpxchg(&rw->ull, state, state | RW_WLOCKED) == state) return;
/* Don't burn too much cpu waiting for RW_RLOCKED to be set */
cpu_relax();
/* Try again */
continue;
}
/* Wait */
rw->b.wcontended = 1;
state |= RW_WCONTEND;
sys_futex(rw, FUTEX_WAIT_BITSET_PRIVATE, state, NULL, NULL, RW_WRITER);
/* Other writers may exist */
rw->b.wcontended = 1;
barrier();
}
}
void rwlock_wunlock(rwlock_t *rw)
{
unsigned state = rw->u;
int i;
/* Locked and uncontended */
if ((state == 1) && (cmpxchg(&rw->u, 1, 0) == 1)) return;
/* Unlock */
rw->b.wlocked = 0;
barrier();
/* Spin and hope someone takes the lock */
for (i = 0; i < 200; i++)
{
if (rw->b.wlocked || rw->b.rlocked) return;
cpu_relax();
}
/* Wake a writer */
if (rw->b.wcontended)
{
rw->b.wcontended = 0;
if (sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, RW_WRITER)) return;
}
/* Wake all readers */
if (rw->b.rcontended)
{
rw->b.rcontended = 0;
sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, NULL, NULL, RW_READER);
}
}
void rwlock_rlock(rwlock_t *rw)
{
while (1)
{
unsigned long long state = atomic_xadd(&rw->ull, RW_READS);
/* Success? */
if (!(state & (RW_WLOCKED | RW_WCONTEND)))
{
/* Already marked read-locked */
if (state & RW_RLOCKED) return;
/* Set read-locked bit so write-lock futex wait works correctly */
rw->b.rlocked = 1;
return;
}
/* Read unlock */
state = atomic_xadd(&rw->ull, -RW_READS);
/* Is it not write locked yet? */
if (!(state & RW_WLOCKED))
{
/* Not write-contended, try again */
if (!(state & RW_WCONTEND)) continue;
state -= RW_READS;
/* Wake a writer if needed */
if (!(state >> 32))
{
/* Only wake on a readlocked -> writelocked transition */
if (xchg_8(&rw->b.rlocked, 0) == 1)
{
rw->b.wcontended = 0;
if (!sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, RW_WRITER))
{
/* None there, so mustn't be contended any more */
continue;
}
}
}
}
state |= RW_RCONTEND;
rw->b.rcontended = 1;
sys_futex(rw, FUTEX_WAIT_BITSET_PRIVATE, state, NULL, NULL, RW_READER);
}
}
void rwlock_runlock(rwlock_t *rw)
{
/* Read unlock */
unsigned long long state = atomic_xadd(&rw->ull, -RW_READS);
int i;
state -= RW_READS;
while (1)
{
/* Other readers there, don't do anything */
if (state >> 32) return;
/* Try to turn off read-locked flag */
if (cmpxchg(&rw->ull, state, state & ~RW_RLOCKED) == state) break;
/* Failure, try again */
state = rw->ull;
cpu_relax();
}
/* Spin and hope someone takes the lock, or have no one waiting */
for (i = 0; i < 200; i++)
{
if (rw->b.wlocked || rw->b.rlocked || !rw->b.wcontended) return;
cpu_relax();
}
/* We need to wake someone up */
rw->b.wcontended = 0;
sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, RW_WRITER);
}
This, (provided gcc is prevented from inlining the functions), gives the results:
Writer-Preferring Bitset Lock |
Writers per 256 | 0 | 1 | 25 | 128 | 250 |
Time (s) | 2.2 | 2.8 | 4.9 | 6.5 | 5.3 |
The above lock is unfortunately quite a bit slower than the Posix locks when there are many readers. Only when the writer count is unreasonably large does it perform well.
The reader-preferring variant is a bit simpler because the read-lock routine doesn't have to wait for writers:
/* Sleeping Read-Write Lock (reader preferring) */
typedef union rwlock_t rwlock_t;
union rwlock_t
{
unsigned long long ull;
unsigned u;
struct
{
unsigned char wlocked;
unsigned char wcontended;
unsigned char rlocked;
unsigned char rcontended;
} b;
};
#define RW_WLOCKED (1ULL<<0)
#define RW_WCONTEND (1ULL<<8)
#define RW_RLOCKED (1ULL<<16)
#define RW_RCONTEND (1ULL<<24)
#define RW_READS (1ULL<<32)
#define RW_WRITER 1
#define RW_READER 2
void rwlock_wlock(rwlock_t *rw)
{
while (1)
{
unsigned state = rw->u;
/* Atomic read of lock state */
barrier();
if (!(state & RW_WLOCKED))
{
if (!(state & RW_RLOCKED))
{
/* If no readers, set write-locked bit */
if (cmpxchg(&rw->ull, state, state | RW_WLOCKED) == state) return;
}
/* Wake sleeping readers, if there are any */
if (rw->b.rcontended)
{
rw->b.rcontended = 0;
if (!sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, NULL, NULL, RW_READER)) continue;
}
}
/* Wait until not write-locked */
rw->b.wcontended = 1;
state |= RW_WCONTEND;
sys_futex(rw, FUTEX_WAIT_BITSET_PRIVATE, state, NULL, NULL, RW_WRITER);
/* Other writers may exist */
rw->b.wcontended = 1;
barrier();
}
}
void rwlock_wunlock(rwlock_t *rw)
{
int i;
/* Unlock */
rw->b.wlocked = 0;
barrier();
/* Wake all readers */
if (rw->b.rcontended)
{
rw->b.rcontended = 0;
if (sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, NULL, NULL, RW_READER)) return;
}
/* Wake a writer */
if (rw->b.wcontended)
{
rw->b.wcontended = 0;
sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, RW_WRITER);
}
}
void rwlock_rlock(rwlock_t *rw)
{
unsigned long long state = atomic_xadd(&rw->ull, RW_READS);
while (1)
{
/* Success? */
if (!(state & RW_WLOCKED))
{
/* Set read-locked bit so write-lock futex wait works correctly */
if (!(state & RW_RLOCKED)) rw->b.rlocked = 1;
return;
}
/* Wait until not write-locked */
state |= RW_RCONTEND;
rw->b.rcontended = 1;
sys_futex(rw, FUTEX_WAIT_BITSET_PRIVATE, state, NULL, NULL, RW_READER);
state = rw->ull;
barrier();
}
}
void rwlock_runlock(rwlock_t *rw)
{
/* Read unlock */
unsigned long long state = atomic_add(&rw->ull, -RW_READS);
int i;
while (1)
{
/* Other readers there, don't do anything */
if (state >> 32) return;
/* Try to turn off read-locked flag */
if (cmpxchg(&rw->ull, state, state & ~RW_RLOCKED) == state) break;
/* Failure, try again */
state = rw->ull;
cpu_relax();
}
/* Spin and hope someone takes the lock, or have no one waiting */
for (i = 0; i < 200; i++)
{
if (rw->b.wlocked || rw->b.rlocked || !rw->b.wcontended) return;
cpu_relax();
}
/* We need to wake someone up */
rw->b.wcontended = 0;
sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, RW_WRITER);
}
Reader-Preferring Bitset Lock |
Writers per 256 | 0 | 1 | 25 | 128 | 250 |
Time (s) | 2.1 | 2.9 | 4.9 | 5.4 | 6.5 |
The read-preferring variant is also very slow compared to the Posix locks when there are many readers. However, it is the fastest so far for the case where the number of readers and writers is the same. There are two reasons for this slow-down. The first is that gcc doesn't understand the Futex syscall doesn't alter any registers. Thus it saves and restores them all. In order to do this, it constructs a stack frame for each function. Unfortunately, the overhead of doing this can be quite large compared the very little work done in the fast-paths. Thus in order to get optimal speed, using assembly is required (just like Glibc has done.)
The second problem is the bitset Futex API itself. The problem here is that the kernel needs to scan the entire waitlist. Since the waitlist may contain both readers and writers, instead of just one or the other, the length of this list is increased. This increases the length of time the kernel holds internal locks protecting the Futex waitlist, and thus reduces scalability. It probably doesn't matter much with only four threads hammering on the read-write lock API, but will once hundreds, thousands or millions of threads do. In order to short-circuit this problem, we can simply make readers and writers wait on different lists.
Futex Eventcount Locks
The simplest way of splitting the single bitset-based lock into two is to use eventcounts. These allow a thread to notice is some event has or has not occurred in the time since it registers its interest in that event. If the event hasn't occurred, the thread may then sleep until it does. Thus this allows threads to notice if other threads alter the lock state without having some sort of global mutex synchronizing everything.
The more complex writer-preferring algorithm using eventcounts looks like:
/* Sleeping Read-Write Lock using event counts (writer preferring) */
typedef struct rwlock_ec rwlock_ec;
struct rwlock_ec
{
union
{
unsigned u;
unsigned char wlocked;
} l;
union
{
unsigned u;
unsigned char contend;
} write_ec;
union
{
unsigned u;
unsigned char contend;
} read_ec;
};
#define RW_WLOCKED (1ULL<<0)
#define RW_READS (1ULL<<8)
#define RW_EC_CONTEND (1ULL<<0)
#define RW_EC_INC (1ULL<<8)
void rwlock_wlock(rwlock_ec *rw)
{
while (1)
{
unsigned ec = rw->write_ec.u | RW_EC_CONTEND;
/* Try to set write-locked if there are no readers */
if (!cmpxchg(&rw->l.u, 0, RW_WLOCKED)) return;
/* Wait */
rw->write_ec.contend = 1;
sys_futex(&rw->write_ec, FUTEX_WAIT_PRIVATE, ec, NULL, NULL, 0);
/* Other writers may exist */
rw->write_ec.contend = 1;
barrier();
}
}
void rwlock_wunlock(rwlock_ec *rw)
{
/* Unlock */
rw->l.wlocked = 0;
/* Wake a writer */
atomic_add(&rw->write_ec.u, RW_EC_INC);
if (rw->write_ec.contend)
{
rw->write_ec.contend = 0;
if (sys_futex(&rw->write_ec, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0)) return;
}
/* Wake all readers */
atomic_add(&rw->read_ec.u, RW_EC_INC);
if (rw->read_ec.contend)
{
rw->read_ec.contend = 0;
sys_futex(&rw->read_ec, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
}
}
void rwlock_rlock(rwlock_ec *rw)
{
while (1)
{
unsigned ec = rw->read_ec.u | RW_EC_CONTEND;
unsigned state;
/* Make sure event count is read before trying to take lock */
barrier();
state = atomic_xadd(&rw->l.u, RW_READS);
/* Success? */
if (!(state & RW_WLOCKED) && !rw->write_ec.contend) return;
/* Read unlock */
state = atomic_xadd(&rw->l.u, -RW_READS);
/*
* Is it not write locked yet, but a writer is sleeping,
* and we are at the readlocked -> writelocked transition.
*/
if (state == RW_READS)
{
atomic_add(&rw->write_ec.u, RW_EC_INC);
if (rw->write_ec.contend)
{
rw->write_ec.contend = 0;
/* Wake the writer, and then try again */
sys_futex(&rw->write_ec, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
continue;
}
}
rw->read_ec.contend = 1;
barrier();
/* No need to enter the kernel... quickly test beforehand */
if (rw->read_ec.u != ec) continue;
/* Wait */
sys_futex(&rw->read_ec, FUTEX_WAIT_PRIVATE, ec, NULL, NULL, 0);
}
}
void rwlock_runlock(rwlock_ec *rw)
{
/* Read unlock */
unsigned state = atomic_add(&rw->l.u, -RW_READS);
/* Other readers there, don't do anything */
if (state >> 8) return;
/* We need to wake someone up */
atomic_add(&rw->write_ec.u, RW_EC_INC);
if (rw->write_ec.contend)
{
rw->write_ec.contend = 0;
sys_futex(&rw->write_ec, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
}
}
The reader-preferring version is again slightly simpler:
/* Sleeping Read-Write Lock using event counts (reader preferring) */
#define RW_WLOCKED (1ULL<<0)
#define RW_READS (1ULL<<8)
#define RW_EC_CONTEND (1ULL<<0)
#define RW_EC_INC (1ULL<<8)
void rwlock_wlock(rwlock_ec *rw)
{
while (1)
{
unsigned ec = rw->write_ec.u;
/* Try to set write-locked if there are no readers */
if (!cmpxchg(&rw->l.u, 0, RW_WLOCKED)) return;
rw->write_ec.contend = 1;
ec |= RW_EC_CONTEND;
sys_futex(&rw->write_ec, FUTEX_WAIT_PRIVATE, ec, NULL, NULL, 0);
/* Other writers may exist */
rw->write_ec.contend = 1;
barrier();
}
}
void rwlock_wunlock(rwlock_ec *rw)
{
/* Unlock */
rw->l.wlocked = 0;
/* Wake all readers */
atomic_add(&rw->read_ec.u, RW_EC_INC);
if (rw->read_ec.contend)
{
rw->read_ec.contend = 0;
if (sys_futex(&rw->read_ec, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0)) return;
}
/* Wake a writer */
atomic_add(&rw->write_ec.u, RW_EC_INC);
if (rw->write_ec.contend)
{
rw->write_ec.contend = 0;
sys_futex(&rw->write_ec, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
}
}
void rwlock_rlock(rwlock_ec *rw)
{
unsigned ec = rw->read_ec.u;
unsigned state;
/* Make sure event count is read before trying to take lock */
barrier();
state = atomic_xadd(&rw->l.u, RW_READS);
while (state & RW_WLOCKED)
{
ec |= RW_EC_CONTEND;
/* Sleep until no longer held by a writer */
rw->read_ec.contend = 1;
sys_futex(&rw->read_ec, FUTEX_WAIT_PRIVATE, ec, NULL, NULL, 0);
ec = rw->read_ec.u;
barrier();
state = rw->l.u;
}
}
void rwlock_runlock(rwlock_ec *rw)
{
/* Read unlock */
unsigned state = atomic_add(&rw->l.u, -RW_READS);
/* Other readers there, don't do anything */
if (state) return;
/* We need to wake someone up */
atomic_add(&rw->write_ec.u, RW_EC_INC);
if (rw->write_ec.contend)
{
rw->write_ec.contend = 0;
sys_futex(&rw->write_ec, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
}
}
However, for optimal speed, we need to use assembly language. The optimized writer-preferring version then is:
.globl rwlock_wlock
.type rwlock_wlock, @function
.align 16
rwlock_wlock:
1: mov $0x1, %ecx
xor %eax, %eax
mov 0x4(%rdi), %edx
# Try to set write-locked
lock cmpxchg %ecx, (%rdi)
jne 2f
rep; retq
# Failed, go to sleep
2: or $0x1, %dl
mov $SYS_futex, %eax
xor %r10, %r10
mov $FUTEX_WAIT_PRIVATE, %esi
movb $0x1, 0x4(%rdi)
add $0x4, %rdi
syscall
movb $0x1, (%rdi)
sub $0x4, %rdi
jmp 1b
.size rwlock_wlock, .-rwlock_wlock
.globl rwlock_wunlock
.type rwlock_wunlock, @function
.align 16
rwlock_wunlock:
mov $0x100, %eax
add $0x4, %rdi
movb $0x0, -0x4(%rdi)
# Any writers sleeping?
lock addl %eax, (%rdi)
cmp $0x0, (%rdi)
jne 2f
# Any readers sleeping?
lock xadd %eax, 0x4(%rdi)
test %al, %al
jne 3f
1: rep; retq
# Wake a writer
2: mov $SYS_futex, %eax
mov $FUTEX_WAKE_PRIVATE, %esi
mov $0x1, %edx
movb $0x0, (%rdi)
syscall
test %eax, %eax
jne 1b
# Any readers sleeping?
mov $0x100, %eax
lock xadd %eax, 0x4(%rdi)
test %al, %al
je 1b
# Wake all readers
3: add $0x4, %rdi
mov $SYS_futex, %eax
mov $FUTEX_WAKE_PRIVATE, %esi
mov $0x7fffffff, %edx
movb $0x0, (%rdi)
syscall
retq
.size rwlock_wunlock, .-rwlock_wunlock
.globl rwlock_rlock
.type rwlock_rlock, @function
.align 16
rwlock_rlock:
add $0x8, %rdi
1: mov $0x100, %eax
mov (%rdi), %edx
mov %eax, %ecx
# Attempt to grab lock
lock xadd %eax, -0x8(%rdi)
or -0x4(%rdi), %al
jne 2f
rep; retq
# Failed, unlock
2: mov $SYS_futex, %eax
lock sub %ecx, -0x8(%rdi)
je 4f
# Sleep on read eventcount
3: or $0x1, %dl
mov $FUTEX_WAIT_PRIVATE, %esi
xor %r10, %r10
movb $0x1, (%rdi)
syscall
jmp 1b
# Test to see if any writers are sleeping
4: lock add %ecx, -0x4(%rdi)
cmpb $0x0, -0x4(%rdi)
je 3b
# Wake a writer, and then try again
sub $0x4, %rdi
mov $FUTEX_WAKE_PRIVATE, %esi
mov %rdx, %r8
movb $0x0, (%rdi)
mov $0x1, %edx
syscall
add $0x4, %rdi
mov %r8, %rdx
jmp 3b
.size rwlock_rlock, .-rwlock_rlock
.globl rwlock_runlock
.type rwlock_runlock,@function
.align 16
rwlock_runlock:
# One less reader
mov $0x100, %eax
lock sub %eax, (%rdi)
je 1f
rep; retq
# Do we need to wake someone up?
1: lock xaddl %eax, 0x4(%rdi)
test %al, %al
jne 2f
rep; retq
# Wake via the write eventcount
2: add $0x4, %rdi
mov $SYS_futex, %eax
mov $FUTEX_WAKE_PRIVATE, %esi
mov $0x1, %edx
movb $0x0, (%rdi)
syscall
retq
.size rwlock_runlock, .-rwlock_runlock
This code gives the following timings
Writer-Preferring Eventcount Lock |
Writers per 256 | 0 | 1 | 25 | 128 | 250 |
Time (s) | 2.1 | 2.6 | 4.7 | 7.3 | 8.5 |
These results are slightly faster than the bitset locks for larger reader fractions, but still horrible compared to the pthreads code. It addition, they are much worse for large writer fractions.
The reader-preferring variant looks like:
.globl rwlock_wlock
.type rwlock_wlock, @function
.align 16
rwlock_wlock:
1: mov $0x1, %ecx
xor %eax, %eax
mov 0x4(%rdi), %edx
# Try to set to write-locked
lock cmpxchg %ecx, (%rdi)
jne 2f
rep; retq
# Failed, sleep until woken via an eventcount
2: or $0x1, %dl
mov $SYS_futex, %eax
xor %r10, %r10
mov $FUTEX_WAIT_PRIVATE, %esi
movb $0x1, 0x4(%rdi)
add $0x4, %rdi
syscall
movb $0x1, (%rdi)
sub $0x4, %rdi
jmp 1b
.size rwlock_wlock, .-rwlock_wlock
.globl rwlock_wunlock
.type rwlock_wunlock, @function
.align 16
rwlock_wunlock:
mov $0x100, %eax
movb $0x0, (%rdi)
# Are there any sleeping readers?
lock add %eax, 0x8(%rdi)
cmpb $0x0, 0x8(%rdi)
jne 2f
# Are there any sleeping writers?
lock xadd %eax, 0x4(%rdi)
test %al, %al
jne 3f
1: rep; retq
# Wake all readers
2: movb $0x0, 0x8(%rdi)
mov $0x7fffffff, %rdx
mov $SYS_futex, %eax
mov $FUTEX_WAKE_PRIVATE, %esi
add $0x8, %rdi
syscall
test %eax, %eax
jne 1b
# Any sleeping writers?
lock addl $0x100, -0x4(%rdi)
cmpb $0x0, -0x4(%rdi)
je 1b
sub $0x8, %rdi
# Wake a writer
3: movb $0x0, 0x4(%rdi)
mov $0x1, %edx
add $0x4, %rdi
mov $SYS_futex, %eax
mov $FUTEX_WAKE_PRIVATE, %esi
syscall
retq
.size rwlock_wunlock, .-rwlock_wunlock
.globl rwlock_rlock
.type rwlock_rlock, @function
.align 16
rwlock_rlock:
mov $0x100, %eax
mov 0x8(%rdi), %edx
# Attempt to take the lock
lock xadd %eax, (%rdi)
test %al, %al
jne 1f
rep; retq
# Failure, sleep until not held by a writer
1: xor %r10, %r10
mov $FUTEX_WAIT_PRIVATE, %esi
add $0x8, %rdi
2: or $0x1, %dl
mov $SYS_futex, %eax
mov %dl, (%rdi)
syscall
mov (%rdi), %edx
cmpb $0x1, -0x8(%rdi)
je 2b
rep; retq
.size rwlock_rlock, .-rwlock_rlock
.globl rwlock_runlock
.type rwlock_runlock,@function
.align 16
rwlock_runlock:
# One less reader
mov $0x100, %eax
lock sub %eax, (%rdi)
je 1f
rep; retq
# Do we need to wake someone up?
1: lock xaddl %eax, 0x4(%rdi)
test %al, %al
jne 2f
rep; retq
# Wake a writer
2: add $0x4, %rdi
mov $SYS_futex, %eax
mov $FUTEX_WAKE_PRIVATE, %esi
mov $0x1, %edx
movb $0x0, (%rdi)
syscall
retq
.size rwlock_runlock, .-rwlock_runlock
Which gives the following benchmark results
Writer-Preferring Eventcount Lock |
Writers per 256 | 0 | 1 | 25 | 128 | 250 |
Time (s) | 2.1 | 2.9 | 4.6 | 5.7 | 6.3 |
Which are slightly worse for the reader-dominated cases, but much better in the writer-dominated ones. These still aren't nearly as good as the Glibc results.
Fair Read-Write Locks
The above locks are either reader-preferring or writer-preferring. However, there is a third type of read-write lock, one that prefers neither. Such a "Fair Lock" maintains an order such that neither readers nor writers can be starved. This can improve throughput since forward progress is guaranteed. The simplest way to create such a lock uses two separate mutexes. One mutex maintains the ordering, and the second controls the mutual exclusivity between readers and writers:
/* Fair rwlock using two mutexes */
typedef struct fair_rwlock fair_rwlock;
struct fair_rwlock
{
mutex m;
mutex writer;
unsigned read_count;
};
void rwlock_wlock(fair_rwlock *rw)
{
/* Use mutex m to communicate with read-lock code */
mutex_lock(&rw->m);
mutex_lock(&rw->writer);
mutex_unlock(&rw->m);
}
void rwlock_wunlock(fair_rwlock *rw)
{
mutex_unlock(&rw->writer);
}
void rwlock_rlock(fair_rwlock *rw)
{
/* The first reader also locks the write mutex */
mutex_lock(&rw->m);
if (!atomic_xadd(&rw->read_count, 1)) mutex_lock(&rw->writer);
mutex_unlock(&rw->m);
}
void rwlock_runlock(fair_rwlock *rw)
{
/* When done, unlock the write mutex */
if (atomic_xadd(&rw->read_count, -1) == 1) mutex_unlock(&rw->writer);
}
Notice how in the above, most of the complexity has been hidden within the mutex implementation. We will use a low-level lock the size of a Futex (integer) for this:
.globl ll_lock
.type ll_lock,@function
.align 16
ll_lock:
xor %ecx, %ecx
# Spin a bit to try to get lock
1: mov $1, %al
xchgb (%rdi), %al
test %al, %al
jz 3f
pause
add $-1, %cl
jnz 1b
# Set up syscall details
mov $0x0101, %edx
mov $FUTEX_WAIT_PRIVATE, %esi
xor %r10, %r10
# Wait loop
2: mov $SYS_futex, %eax
syscall
mov %edx, %eax
xchgl (%rdi), %eax
test $1, %eax
jnz 2b
3: rep; ret
.size ll_lock, .-ll_lock
.globl ll_unlock
.type ll_unlock,@function
.align 16
ll_unlock:
xor %ecx, %ecx
cmpl $1, (%rdi)
jne 1f
mov $1, %eax
# Fast path... no one is sleeping
lock; cmpxchgl %ecx, (%rdi)
jz 3f
1: movb $0, (%rdi)
# Spin, and hope someone takes the lock
2: testb $1, (%rdi)
jnz 3f
pause
add $-1, %cl
jnz 2b
# Wake up someone
movb $0, 1(%rdi)
mov $SYS_futex, %eax
mov $FUTEX_WAKE_PRIVATE, %esi
mov $1, %edx
syscall
3: ret
.size ll_unlock, .-ll_unlock
Using the above primitives, we can construct the asm versions of the mutex based fair read-write lock. The main trick here is that the asm routines do not mess with %rdi which points to the address of the mutex. This means that the calling routine does not have to save and restore this variable, saving the construction of a stack frame.
.globl rwlock_wlock
.type rwlock_wlock,@function
.align 16
rwlock_wlock:
call ll_lock
add $0x4, %rdi
call ll_lock
sub $0x4, %rdi
jmp ll_unlock
.size rwlock_wlock, .-rwlock_wlock
.globl rwlock_wunlock
.type rwlock_wunlock,@function
.align 16
rwlock_wunlock:
add $0x4, %rdi
jmp ll_unlock
.size rwlock_wunlock, .-rwlock_wunlock
.globl rwlock_rlock
.type rwlock_rlock,@function
.align 16
rwlock_rlock:
call ll_lock
mov $1, %eax
# Are we the last reader?
lock xadd %eax, 0x8(%rdi)
test %eax, %eax
jnz ll_unlock
add $0x4, %rdi
call ll_lock
sub $0x4, %rdi
jmp ll_unlock
.size rwlock_rlock, .-rwlock_rlock
.globl rwlock_runlock
.type rwlock_runlock,@function
.align 16
rwlock_runlock:
lock addl $-1, 0x8(%rdi)
jz rwlock_wunlock
rep; ret
.size rwlock_runlock, .-rwlock_runlock
The benchmark results of the mutex-based fair lock are:
Mutex Fair Lock |
Writers per 256 | 0 | 1 | 25 | 128 | 250 |
Time (s) | 1.1 | 1.3 | 2.6 | 6.0 | 6.1 |
This lock is amazingly fast, beating or equalling the Posix locks in all situations. However, some of the other lock implementations above do beat it when the number of readers is the same as writers. Perhaps we can improve speed some more by noticing that the above algorithm doesn't use the fact that multiple readers may be awoken simultaneously when a readers gains control of a lock. The simplest way of arranging this using "standard" threading primitives is to use a condition variable. By using a broadcast to the condition variable, multiple threads can be awoken at once. C code that does this looks like:
typedef struct fair_rwlock fair_rwlock;
struct fair_rwlock
{
mutex m;
mutex waitlist;
cv read_wait;
unsigned read_count;
unsigned write_count;
char read_flag;
};
void rwlock_wlock(fair_rwlock *rw)
{
/* Use mutex m to synchronize the lock state with the read path */
mutex_lock(&rw->m);
rw->write_count++;
mutex_unlock(&rw->m);
mutex_lock(&rw->waitlist);
}
void rwlock_wunlock(fair_rwlock *rw)
{
/* Use mutex m to synchronize the lock state with the read path */
mutex_lock(&rw->m);
rw->write_count--;
mutex_unlock(&rw->m);
mutex_unlock(&rw->waitlist);
}
void rwlock_rlock(fair_rwlock *rw)
{
mutex_lock(&rw->m);
while (1)
{
/* Fast path, only readers */
if (!rw->write_count && rw->read_count)
{
rw->read_count++;
mutex_unlock(&rw->m);
return;
}
/* Any other waiting readers? */
if (!rw->read_flag) break;
/* Have a waiting reader, append to wait-list */
cond_wait(&rw->read_wait, &rw->m);
/* If other readers are there, then we can read-lock. */
if (rw->read_count)
{
rw->read_count++;
mutex_unlock(&rw->m);
return;
}
}
/* Need to wait until writers are done */
rw->read_flag = 1;
mutex_unlock(&rw->m);
mutex_lock(&rw->waitlist);
mutex_lock(&rw->m);
/* Wake other waiting readers, and lock the write-lock */
rw->read_flag = 0;
rw->read_count++;
cond_broadcast(&rw->read_wait);
mutex_unlock(&rw->m);
}
void rwlock_runlock(fair_rwlock *rw)
{
mutex_lock(&rw->m);
rw->read_count--;
if (!rw->read_count)
{
/* The last reader unlocks the write-lock */
mutex_unlock(&rw->waitlist);
}
mutex_unlock(&rw->m);
}
And similarly, in assembly language:
.globl rwlock_wlock
.type rwlock_wlock,@function
.align 16
rwlock_wlock:
call ll_lock
addl $1, 0x10(%rdi)
call ll_unlock
add $0x4, %rdi
jmp ll_lock
.size rwlock_wlock, .-rwlock_wlock
.globl rwlock_wunlock
.type rwlock_wunlock,@function
.align 16
rwlock_wunlock:
call ll_lock
subl $1, 0x10(%rdi)
call ll_unlock
add $0x4, %rdi
jmp ll_unlock
.size rwlock_wunlock, .-rwlock_wunlock
.globl rwlock_rlock
.type rwlock_rlock,@function
.align 16
rwlock_rlock:
call ll_lock
1: cmpl $0, 0x10(%rdi)
jne 2f
cmpl $0, 0xc(%rdi)
je 2f
addl $1, 0xc(%rdi)
jmp ll_unlock
2: cmpb $0, 0x14(%rdi)
je 3f
# Save seq for condvar
movl 0x8(%rdi), %r8d
# Unlock mutex
call ll_unlock
add $0x8, %rdi
# Wait on condvar
mov %r8d, %edx
mov $SYS_futex, %eax
xor %r10, %r10
mov $FUTEX_WAIT_PRIVATE, %esi
movb $1, (%rdi)
syscall
# Lock the mutex again
sub $0x8, %rdi
call ll_lock
# Mark as contended
movb $1, 0x1(%rdi)
cmpl $0, 0xc(%rdi)
je 1b
addl $1, 0xc(%rdi)
jmp ll_unlock
# We are the first reader - write lock it
3: movb $1, 0x14(%rdi)
call ll_unlock
add $0x4, %rdi
call ll_lock
sub $0x4, %rdi
call ll_lock
# Convert to a read lock
movb $0, 0x14(%rdi)
mov $0x100, %eax
addl $1, 0xc(%rdi)
# Any readers waiting?
lock xadd %eax, 0x8(%rdi)
test $1, %al
jz ll_unlock
# Wake them
mov %rdi, %r8
add $0x8, %rdi
mov $SYS_futex, %eax
mov $FUTEX_REQUEUE_PRIVATE, %esi
mov $1, %edx
mov $0x7fffffff, %r10
syscall
sub $0x8, %rdi
jmp ll_unlock
.size rwlock_rlock, .-rwlock_rlock
.globl rwlock_runlock
.type rwlock_runlock,@function
.align 16
rwlock_runlock:
call ll_lock
subl $1, 0xc(%rdi)
jnz ll_unlock
call ll_unlock
add $0x4, %rdi
jmp ll_unlock
.size rwlock_runlock, .-rwlock_runlock
This algorithm benchmarks as:
Mutex + CV Fair Lock |
Writers per 256 | 0 | 1 | 25 | 128 | 250 |
Time (s) | 1.6 | 4.2 | 7.8 | 6.9 | 5.4 |
This performs horribly for medium numbers of readers, but very good for large number of writers. However, again, such a situation would usually be solved through the use of a simple mutex rather than a read-write lock. The reason for the poor performance is that the condition variable at the lowest level doesn't perform in the way we would wish. Instead of waking all sleeping readers, it wakes one, and re-queues the rest on the mutex. We can fix this by replacing the condvar with an eventcount. Similarly, we can merge the waiter count with the mutex, and read flag with the reader count, shrinking the lock data structure size down quite a bit. Doing this yields the rather complex:
typedef struct fair_rwlock fair_rwlock;
struct fair_rwlock
{
union
{
unsigned long long l;
unsigned waiters;
unsigned char locked;
} lock;
union
{
unsigned seq;
unsigned char contend;
} read_wait;
};
void rwlock_wlock(fair_rwlock *rw)
{
int i;
/* There is a waiting writer */
atomic_add(&rw->lock.waiters, 256);
/* Spin for a bit to try to get the lock */
for (i = 0; i < 100; i++)
{
if (!(xchg_8(&rw->lock.locked, 1) & 1)) return;
cpu_relax();
}
/* Failed, so we need to sleep */
while (xchg_8(&rw->lock.locked, 1) & 1)
{
sys_futex(&rw->lock, FUTEX_WAIT_PRIVATE, rw->lock.waiters | 1, NULL, NULL, 0);
}
}
void rwlock_wunlock(fair_rwlock *rw)
{
/* One less writer and also unlock simultaneously */
if (atomic_add(&rw->lock.waiters, -257))
{
int i;
/* Spin for a bit hoping someone will take the lock */
for (i = 0; i < 200; i++)
{
if (rw->lock.locked) return;
cpu_relax();
}
/* Failed, we need to wake someone */
sys_futex(&rw->lock, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
}
}
void rwlock_rlock(fair_rwlock *rw)
{
while (1)
{
unsigned long long val;
unsigned seq = rw->read_wait.seq;
barrier();
val = rw->lock.l;
/* Fast path, no readers or writers */
if (!val)
{
if (!cmpxchg(&rw->lock.l, 0, 1 + 2 + (1ULL << 33))) return;
/* Failed, try again */
cpu_relax();
continue;
}
/* Fast path, only readers */
if ((((unsigned) val) == 1 + 2) && (val >> 33))
{
if (cmpxchg(&rw->lock.l, val, val + (1ULL << 33)) == val) return;
/* Failed, try again */
cpu_relax();
continue;
}
/* read flag set? */
if (val & (1ULL << 32))
{
/* Have a waiting reader, append to wait-list */
rw->read_wait.contend = 1;
sys_futex(&rw->read_wait, FUTEX_WAIT_PRIVATE, seq | 1, NULL, NULL, 0);
val = rw->lock.l;
/* If there are readers, we can take the lock */
if (((val & 3) == 3) && (val >> 33))
{
/* Grab a read lock so long as we are allowed to */
if (cmpxchg(&rw->lock.l, val, val + (1ULL << 33)) == val) return;
}
/* Too slow, try again */
cpu_relax();
continue;
}
/* Try to set read flag + waiting */
if (cmpxchg(&rw->lock.l, val, val + 256 + (1ULL << 32)) == val) break;
cpu_relax();
}
/* Grab write lock */
while (xchg_8(&rw->lock.locked, 1) & 1)
{
sys_futex(&rw->lock, FUTEX_WAIT_PRIVATE, rw->lock.waiters | 1, NULL, NULL, 0);
}
/* Convert write lock into read lock. Use 2-bit to denote that readers can enter */
atomic_add(&rw->lock.l, 2 - 256 - (1ULL << 32) + (1ULL << 33));
/* We are waking everyone up */
if (atomic_xadd(&rw->read_wait.seq, 256) & 1)
{
rw->read_wait.contend = 0;
/* Wake one thread, and re-queue the rest on the mutex */
sys_futex(&rw->read_wait, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
}
}
void rwlock_runlock(fair_rwlock *rw)
{
/* Unlock... and test to see if we are the last reader */
if (!(atomic_add(&rw->lock.l, -(1ULL << 33)) >> 33))
{
rw->lock.locked = 0;
barrier();
/* We need to wake someone up? */
if (rw->lock.waiters)
{
sys_futex(&rw->lock, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
}
}
}
Or, in asm:
.globl rwlock_wlock
.type rwlock_wlock,@function
.align 16
rwlock_wlock:
# There is now a waiting writer
lock addl $0x100, (%rdi)
# Try to write-lock
1: mov $0x1, %al
xchg %al, (%rdi)
test $0x1, %al
jnz 2f
rep; retq
# Wait until we can try to take the lock
2: mov (%rdi), %edx
mov $SYS_futex, %eax
or $1, %dl
xor %r10, %r10
mov $FUTEX_WAIT_PRIVATE, %esi
syscall
jmp 1b
.size rwlock_wlock, .-rwlock_wlock
.globl rwlock_wunlock
.type rwlock_wunlock,@function
.align 16
rwlock_wunlock:
# One less writer + unlock simultaneously
mov $0xfffffeff, %eax
lock xadd %eax, (%rdi)
cmp $0x101, %eax
jne 1f
rep; retq
# We need to wake someone
1: mov $SYS_futex, %eax
mov $FUTEX_WAKE_PRIVATE, %esi
mov $0x1, %edx
syscall
retq
.size rwlock_wunlock, .-rwlock_wunlock
.globl rwlock_rlock
.type rwlock_runlock,@function
.align 16
runlock_c1:
.quad 0x200000003
runlock_c2:
.quad 0x200000000
runlock_c3:
.quad 0x100000100
rwlock_rlock:
mov (%rdi), %rax
jmp 2f
1: pause
2: test %rax, %rax
jnz 3f
# Fast path - no one has the lock, so try to grab it
mov runlock_c1(%rip), %rcx
lock cmpxchg %rcx, (%rdi)
jnz 3f
rep; retq
# Are there other readers but no writers?
3: mov %rax, %rcx
cmp $0x3, %eax
jne 5f
shr $0x21, %rcx
je 5f
4: mov %rax, %rcx
add runlock_c2(%rip), %rcx
lock cmpxchg %rcx, (%rdi)
jnz 1b
rep; retq
# Is there a waiting reader?
5: mov 8(%rdi), %edx
testb $0x1, 4(%rdi)
je 6f
# Wait until that reader is woken
add $0x8, %rdi
orb $0x1, %dl
xor %r10, %r10
mov $FUTEX_WAIT_PRIVATE, %esi
mov $SYS_futex, %eax
movb $0x1, (%rdi)
syscall
# Try to grab the lock once we are woken
mov -0x8(%rdi), %rax
sub $0x8, %rdi
cmp $0x3, %al
jne 1b
mov %rax, %rcx
shr $0x21, %rcx
je 1b
jmp 4b
# Try to be the first reader
6: mov %rax, %rcx
add runlock_c3(%rip), %rcx
lock cmpxchg %rcx, (%rdi)
jnz 1b
# We are the first reader to wait
xor %r10, %r10
mov $FUTEX_WAIT_PRIVATE, %esi
# Try to get the write lock
7: mov $0x1, %al
xchg %al, (%rdi)
test $0x1, %al
jz 8f
# Failed, sleep until we are next in line
mov (%rdi), %edx
mov $SYS_futex, %eax
or $1, %dl
syscall
jmp 7b
# Convert into a read lock
8: mov $0xffffff02, %eax
lock add %rax, (%rdi)
mov $0x100, %eax
# Are there any sleeping readers?
lock xadd %eax, 0x8(%rdi)
test $0x1, %al
jz 9f
# Wake them
add $0x8, %rdi
mov $SYS_futex, %eax
mov $FUTEX_WAKE_PRIVATE, %esi
mov $0x7fffffff, %edx
movb $0x0, (%rdi)
syscall
9: retq
.size rwlock_rlock, .-rwlock_rlock
.globl rwlock_runlock
.type rwlock_runlock,@function
.align 16
rwlock_runlock:
mov $-2, %eax
mov %eax, %edx
# Unlock, and test to see if we are the last reader
lock xadd %eax, 4(%rdi)
add %edx, %eax
and %edx, %eax
jz 1f
rep; retq
# Is there anyone sleeping?
1: movb $0, (%rdi)
cmp %eax, (%rdi)
je 2f
# Wake someone
mov $SYS_futex, %eax
mov $FUTEX_WAKE_PRIVATE, %esi
mov $0x1, %edx
syscall
2: retq
.size rwlock_runlock, .-rwlock_runlock
Yielding the results:
Event Count Fair Lock |
Writers per 256 | 0 | 1 | 25 | 128 | 250 |
Time (s) | 1.1 | 1.6 | 6.9 | 9.3 | 7.7 |
This is much improved compared to the condition variable based lock, being comparable to the writer-preferring Posix lock. However, it is quite slow compared to the mutex-based fair lock. Unfortunately, there is a race on reader-thread wakeup. A waking reader may miss the window where it can take the lock, in which case it goes back to sleep. This causes a loss of performance. One might hope that making the newly-wakened readers spin with a cmpxchg loop would help. However, that produces even worse benchmark results.
Summary
The best performing read-write locks seem to be the simplest. Using complex algorithms with many atomic instructions seems to result in worse performance. Even though the Posix read-write locks in Glibc are implemented via a mutex, they perform extremely well. However, it is still possible to do better. By using a mutex-based fair read-write lock, one can gain extra speed. That particular lock is about twice as fast as the default reader-preferring lock when there are many readers, and around twice as fast as writer-preferring lock when the number of readers and writers are similar. In other situations, it performs as well as the competition.
It may be possible to improve the fair lock by allowing multiple readers to awaken simultaneously. However, doing so is non-trivial. The reader which wakes the others may finish with the critical section before the wakeups are complete. This causes a race which is hard to plug without extra locking that slows things down more than what is gained. The other possibility, of allowing such newly awakened readers to fail to grab the lock causes the fairness guarantees to be weakened. This again results in the loss of performance due to starvation.
|
Comments
Samy Al Bahra said...The check for writer contention should be a "cmpb" and not "cmp". As it is, we'll make the wake system call for exclusive waiters every time we do a write unlock. I reconstructed the test scenario described here and found that the corrected code is very competitive with the native rwlocks.
Really the worst issue is with the interface itself. The acquisition and release semantics should be exposed to the consumer of the lock, which has never been done entirely (if I recall, the Windows kernel has a shared-acquire-starve-exclusive call which is a step in this direction).
I also flipped over and ran the scenario on Windows SRW locks. In all but the uncontentested case (predictably, but sadly), they basically get toasted.
I dug into the SRW disassembly and found that unsurprisingly they're style over substance, with what appear to be some pretty silly things going on. I'd be happy to type something up on they're design and send it on, if you think your readers would be interested.
@Borislav: I read through the article you posted and I'm not sure what you're asking about. Do you mean can a lock be constructed which separates the contention between readers and writers better?
The lock that Joe Duffy is describing is a C# algorithm being compared to other theoretical C# algorithms. The locks described above will more than likely beat it hands down in any scenario (the performance difference between C# and hand-optimized assembly is typically vast).
/* Wait until there are no writers */
-while (l->wwait || (l->wrlock && l->writer_preferred))
+while (l->wrlock || (l->wwait && l->writer_preferred))
{
l->rwait++;
...
Otherwise we cannot recursively rdlock as required by POSIX.
bool rwlock_wtrylock(fair_rwlock *rw)
{
if (!mutex_trylock(&rw->m)) return false;
if (mutex_trylock(&rw->writer))
{
mutex_unlock(&rw->m);
return true;
}
mutex_unlock(&rw->m);
return false;
}
bool rwlock_wtrylockuntil(fair_rwlock *rw, unsigned int tEnd)
{
if (!mutex_trylockuntil(&rw->m, tEnd)) return false;
if (mutex_trylockuntil(&rw->writer, tEnd))
{
mutex_unlock(&rw->m);
return true;
}
mutex_unlock(&rw->m);
return false;
}
bool rwlock_rtrylock(fair_rwlock *rw)
{
if (!mutex_trylock(&rw->m)) return false;
if (atomic_xadd(&rw->read_count, 1) || mutex_trylock(&rw->writer))
{
mutex_unlock(&rw->m);
return true;
}
mutex_unlock(&rw->m);
return false;
}
bool rwlock_rtrylockuntil(fair_rwlock *rw, unsigned int tEnd)
{
if (!mutex_trylockuntil(&rw->m, tEnd)) return false;
if (atomic_xadd(&rw->read_count, 1) || mutex_trylockuntil(&rw->writer, tEnd))
{
mutex_unlock(&rw->m);
return true;
}
mutex_unlock(&rw->m);
return false;
}