Lockless Inc

Sleeping Read-Write Locks

Posix Locks

Read-write locks are useful when a large fraction of the users of a critical section do not change any of the state protected by the lock. This allows these "readers" or "shared owners" to potentially execute in parallel. On the other hand, since the protected state also needs to change, another type of locking is required for "writers" or "exclusive owners". (Note that obviously if the data never changes, then no locking is required at all.) The complexity of having two types of ownership makes read-write locks slower than traditional mutexes. In general, only when the number of readers outnumbers the writers, is this form of locking faster.

If the length of time one holds the lock is small, then it may be worth using a spinning read-write lock. However, due to the overhead of this synchronization primitive, it is usually only worth using when the lock hold time is relatively long. If this is the case, then the waiters are better off sleeping within the kernel until the lock is released, as compared to wasting cycles spinning. On Microsoft Windows, such a locking API is implemented by the "Slim Read-write locks" (SRWLOCK). On Linux, and other Unix-based operating systems, the pthreads pthread_rwlock_t is available.

Glibc implements these locks as follows:


typedef struct rwlock_t rwlock_t;
struct rwlock_t
{
	mutex m;
	unsigned wrlock;
	unsigned rcount;
	unsigned wwait;
	unsigned rwait;
	char writer_preferred;
};

int rwlock_rdlock(rwlock_t *l)
{
	mutex_lock(&l->m);
	
	/* Wait until there are no writers */
	while (l->wwait || (l->wrlock && l->writer_preferred))
	{
		l->rwait++;
		mutex_unlock(&l->m);
		sys_futex(&l->rwait, FUTEX_WAIT_PRIVATE, 1, NULL, NULL, 0);
		mutex_lock(&l->m);

		l->rwait--;
	}
	
	/* There is a reader */
	l->rcount++;
			
	mutex_unlock(&l->m);
			
	return 0;
}

int rwlock_wrlock(rwlock_t *l)
{
	mutex_lock(&l->m);
	
	/* Wait until there are no readers */
	while (l->rcount || l->wrlock)
	{
		l->wwait++;
		mutex_unlock(&l->m);
		sys_futex(&l->wwait, FUTEX_WAIT_PRIVATE, 1, NULL, NULL, 0);
		mutex_lock(&l->m);

		l->wwait--;
	}
	
	/* There is a writer */
	l->wrlock = 1;

	mutex_unlock(&l->m);

	return 0;
}

/* pthreads has merged read and write unlock */
int rwlock_unlock(rwlock_t *l)
{
	mutex_lock(&l->m);
	
	/* If there are readers, then we are read-locked */
	if (l->rcount)
	{
		/* One less reader */
		l->rcount--;
		
		/* Still have other readers */
		if (l->rcount)
		{
			mutex_unlock(&l->m);
			return 0;
		}
	}
	else
	{
		/* No longer write locked */
		l->wrlock = 0;
	}
	mutex_unlock(&l->m);
	
	/* Wake waiting writers first */
	if (l->wwait)
	{
		sys_futex(&l->wwait, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
		return 0;
	}
	
	/* Then wake all waiting readers */
	if (l->rwait)
	{
		sys_futex(&l->rwait, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
	}

	return 0;
}

The above code is obviously not exactly how Glibc implements them as the real code is written in assembly and has different identifiers. However, the above is algorithmically how it works. Note how there is only one type of unlock routine (as mandated by the Posix specification), which determines whether to read or write unlock based on the current owner. There are also two different types of lock based on whether the writer_preferred parameter is set or not. If it is set, then waiting writers will prevent new readers from taking the lock. If it is unset, then new readers can preempt writers so long as other readers still have the lock. This gives two different sets of performance characteristics.

We benchmark the Posix read-write locks by having four threads (on this quad-core machine) randomly read and write lock the lock. They then spin-wait for a given number of cycles, and then unlock the lock. By varying the fraction of time we write-lock the lock as compared to read-lock it, we can see how this affects the speed.

The default initialization of a lock using PTHREAD_RWLOCK_INITIALIZER gives a read-preferring version:

Reader-Preferring Posix Lock
Writers per 2560125128250
Time (s)1.12.04.46.16.4

Using PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP gives the writer-preferring variant:

Writer-Preferring Posix Lock
Writers per 2560125128250
Time (s)1.11.66.710.76.8

Notice how the performance characteristics of the two lock types is quite different. Writer-preferring locks are better when there are mostly readers because they prevent write starvation. However, as the writer fraction rises they perform worse than their competitor, with a worse-case situation at about 50-50 readers to writers. Finally, when the number of writers dominates, the two types of lock perform comparably. (However, in this situation, you'd probably want to use a simple mutex instead.)

Futex Bitset Locks

The above code "obviously" isn't optimal. It uses a low level mutex to synchronize the internal state of the read-writer lock. If we can avoid this, then perhaps we can gain extra parallelism. For example, it may be possible for many readers to simultaneously lock at the same time instead of being synchronized. The obvious way to do this is to use the bitset capability of Futexes. This allows one to record classes of threads waiting on the same futex. By using one bit for readers, and one for writers, we can then wake the correct group when desired.

A writer-preferring version might look like:


/* Sleeping Read-Write Lock (writer preferring) */
typedef union rwlock_t rwlock_t;

union rwlock_t
{
	unsigned long long ull;
	unsigned u;
	struct
	{
		unsigned char wlocked;
		unsigned char wcontended;
		unsigned char rlocked;
		unsigned char rcontended;
	} b;
};

#define RW_WLOCKED		(1ULL<<0)
#define RW_WCONTEND		(1ULL<<8)
#define RW_RLOCKED		(1ULL<<16)
#define RW_RCONTEND		(1ULL<<24)
#define RW_READS		(1ULL<<32)

#define RW_WRITER		1
#define RW_READER		2


void rwlock_wlock(rwlock_t *rw)
{
	while (1)
	{
		unsigned state = rw->u;

		/* Atomic read of lock state */
		barrier();
		
		/* Try to set write-locked if there are no readers */
		if (!(state & (RW_WLOCKED | RW_RLOCKED)))
		{
			/* If no readers, set write-locked bit */
			if (cmpxchg(&rw->ull, state, state | RW_WLOCKED) == state) return;
			
			/* Don't burn too much cpu waiting for RW_RLOCKED to be set */
			cpu_relax();
			
			/* Try again */
			continue;
		}
		
		/* Wait */
		rw->b.wcontended = 1;
		state |= RW_WCONTEND;

		sys_futex(rw, FUTEX_WAIT_BITSET_PRIVATE, state, NULL, NULL, RW_WRITER);
		
		/* Other writers may exist */
		rw->b.wcontended = 1;
		barrier();
	}
}

void rwlock_wunlock(rwlock_t *rw)
{
	unsigned state = rw->u;
	int i;
	
	/* Locked and uncontended */
	if ((state == 1) && (cmpxchg(&rw->u, 1, 0) == 1)) return;
	
	/* Unlock */
	rw->b.wlocked = 0;
	
	barrier();
	
	/* Spin and hope someone takes the lock */
	for (i = 0; i < 200; i++)
	{
		if (rw->b.wlocked || rw->b.rlocked) return;
		
		cpu_relax();
	}
	
	/* Wake a writer */
	if (rw->b.wcontended)
	{
		rw->b.wcontended = 0;
		if (sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, RW_WRITER)) return;
	}
	
	/* Wake all readers */
	if (rw->b.rcontended)
	{
		rw->b.rcontended = 0;
		sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, NULL, NULL, RW_READER);
	}
}

void rwlock_rlock(rwlock_t *rw)
{
	while (1)
	{
		unsigned long long state = atomic_xadd(&rw->ull, RW_READS);
	
		/* Success? */
		if (!(state & (RW_WLOCKED | RW_WCONTEND)))
		{
			/* Already marked read-locked */
			if (state & RW_RLOCKED) return;
			
			/* Set read-locked bit so write-lock futex wait works correctly */
			rw->b.rlocked = 1;
			return;
		}

		/* Read unlock */
		state = atomic_xadd(&rw->ull, -RW_READS);
	
		/* Is it not write locked yet? */
		if (!(state & RW_WLOCKED))
		{
			/* Not write-contended, try again */
			if (!(state & RW_WCONTEND)) continue;

			state -= RW_READS;

			/* Wake a writer if needed */
			if (!(state >> 32))
			{
				/* Only wake on a readlocked -> writelocked transition */
				if (xchg_8(&rw->b.rlocked, 0) == 1)
				{
					rw->b.wcontended = 0;
					if (!sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, RW_WRITER))
					{
						/* None there, so mustn't be contended any more */
						continue;
					}
				}
			}
		}
		
		state |= RW_RCONTEND;
		rw->b.rcontended = 1;
		
		sys_futex(rw, FUTEX_WAIT_BITSET_PRIVATE, state, NULL, NULL, RW_READER);
	}
}

void rwlock_runlock(rwlock_t *rw)
{
	/* Read unlock */
	unsigned long long state = atomic_xadd(&rw->ull, -RW_READS);
	int i;
	
	state -= RW_READS;
		
	while (1)
	{
		/* Other readers there, don't do anything */
		if (state >> 32) return;
		
		/* Try to turn off read-locked flag */
		if (cmpxchg(&rw->ull, state, state & ~RW_RLOCKED) == state) break;
		
		/* Failure, try again */
		state = rw->ull;
		cpu_relax();
	}
	
	/* Spin and hope someone takes the lock, or have no one waiting */
	for (i = 0; i < 200; i++)
	{
		if (rw->b.wlocked || rw->b.rlocked || !rw->b.wcontended) return;
		
		cpu_relax();
	}
	
	/* We need to wake someone up */
	rw->b.wcontended = 0;
	sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, RW_WRITER);
}

This, (provided gcc is prevented from inlining the functions), gives the results:

Writer-Preferring Bitset Lock
Writers per 2560125128250
Time (s)2.22.84.96.55.3

The above lock is unfortunately quite a bit slower than the Posix locks when there are many readers. Only when the writer count is unreasonably large does it perform well.

The reader-preferring variant is a bit simpler because the read-lock routine doesn't have to wait for writers:


/* Sleeping Read-Write Lock (reader preferring) */
typedef union rwlock_t rwlock_t;

union rwlock_t
{
	unsigned long long ull;
	unsigned u;
	struct
	{
		unsigned char wlocked;
		unsigned char wcontended;
		unsigned char rlocked;
		unsigned char rcontended;
	} b;
};

#define RW_WLOCKED		(1ULL<<0)
#define RW_WCONTEND		(1ULL<<8)
#define RW_RLOCKED		(1ULL<<16)
#define RW_RCONTEND		(1ULL<<24)
#define RW_READS		(1ULL<<32)

#define RW_WRITER		1
#define RW_READER		2

void rwlock_wlock(rwlock_t *rw)
{
	while (1)
	{
		unsigned state = rw->u;

		/* Atomic read of lock state */
		barrier();
		
		if (!(state & RW_WLOCKED))
		{
			if (!(state & RW_RLOCKED))
			{
				/* If no readers, set write-locked bit */
				if (cmpxchg(&rw->ull, state, state | RW_WLOCKED) == state) return;
			}
			
			/* Wake sleeping readers, if there are any */
			if (rw->b.rcontended)
			{
				rw->b.rcontended = 0;
				if (!sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, NULL, NULL, RW_READER)) continue;
			}
		}
		
		/* Wait until not write-locked */
		rw->b.wcontended = 1;
		state |= RW_WCONTEND;

		sys_futex(rw, FUTEX_WAIT_BITSET_PRIVATE, state, NULL, NULL, RW_WRITER);
		
		/* Other writers may exist */
		rw->b.wcontended = 1;
		barrier();
	}
}

void rwlock_wunlock(rwlock_t *rw)
{
	int i;
	
	/* Unlock */
	rw->b.wlocked = 0;
	
	barrier();
		
	/* Wake all readers */
	if (rw->b.rcontended)
	{
		rw->b.rcontended = 0;
		if (sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, NULL, NULL, RW_READER)) return;
	}
	
	/* Wake a writer */
	if (rw->b.wcontended)
	{
		rw->b.wcontended = 0;
		sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, RW_WRITER);
	}
}

void rwlock_rlock(rwlock_t *rw)
{
	unsigned long long state = atomic_xadd(&rw->ull, RW_READS);
	
	while (1)
	{
		/* Success? */
		if (!(state & RW_WLOCKED))
		{
			/* Set read-locked bit so write-lock futex wait works correctly */
			if (!(state & RW_RLOCKED)) rw->b.rlocked = 1;
			
			return;
		}
		
		/* Wait until not write-locked */
		state |= RW_RCONTEND;
		rw->b.rcontended = 1;
		
		sys_futex(rw, FUTEX_WAIT_BITSET_PRIVATE, state, NULL, NULL, RW_READER);
		
		state = rw->ull;
		
		barrier();
	}
}

void rwlock_runlock(rwlock_t *rw)
{
	/* Read unlock */
	unsigned long long state = atomic_add(&rw->ull, -RW_READS);
	int i;
	
	while (1)
	{
		/* Other readers there, don't do anything */
		if (state >> 32) return;
		
		/* Try to turn off read-locked flag */
		if (cmpxchg(&rw->ull, state, state & ~RW_RLOCKED) == state) break;
		
		/* Failure, try again */
		state = rw->ull;
		cpu_relax();
	}

	/* Spin and hope someone takes the lock, or have no one waiting */
	for (i = 0; i < 200; i++)
	{
		if (rw->b.wlocked || rw->b.rlocked || !rw->b.wcontended) return;
		
		cpu_relax();
	}
	
	/* We need to wake someone up */
	rw->b.wcontended = 0;
	sys_futex(rw, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, RW_WRITER);
}
Reader-Preferring Bitset Lock
Writers per 2560125128250
Time (s)2.12.94.95.46.5

The read-preferring variant is also very slow compared to the Posix locks when there are many readers. However, it is the fastest so far for the case where the number of readers and writers is the same. There are two reasons for this slow-down. The first is that gcc doesn't understand the Futex syscall doesn't alter any registers. Thus it saves and restores them all. In order to do this, it constructs a stack frame for each function. Unfortunately, the overhead of doing this can be quite large compared the very little work done in the fast-paths. Thus in order to get optimal speed, using assembly is required (just like Glibc has done.)

The second problem is the bitset Futex API itself. The problem here is that the kernel needs to scan the entire waitlist. Since the waitlist may contain both readers and writers, instead of just one or the other, the length of this list is increased. This increases the length of time the kernel holds internal locks protecting the Futex waitlist, and thus reduces scalability. It probably doesn't matter much with only four threads hammering on the read-write lock API, but will once hundreds, thousands or millions of threads do. In order to short-circuit this problem, we can simply make readers and writers wait on different lists.

Futex Eventcount Locks

The simplest way of splitting the single bitset-based lock into two is to use eventcounts. These allow a thread to notice is some event has or has not occurred in the time since it registers its interest in that event. If the event hasn't occurred, the thread may then sleep until it does. Thus this allows threads to notice if other threads alter the lock state without having some sort of global mutex synchronizing everything.

The more complex writer-preferring algorithm using eventcounts looks like:


/* Sleeping Read-Write Lock using event counts (writer preferring) */
typedef struct rwlock_ec rwlock_ec;

struct rwlock_ec
{
	union
	{
		unsigned u;
		unsigned char wlocked;
	} l;
	
	union
	{
		unsigned u;
		unsigned char contend;
	} write_ec;
	
	union
	{
		unsigned u;
		unsigned char contend;
	} read_ec;
};

#define RW_WLOCKED		(1ULL<<0)
#define RW_READS		(1ULL<<8)

#define RW_EC_CONTEND	(1ULL<<0)
#define RW_EC_INC		(1ULL<<8)

void rwlock_wlock(rwlock_ec *rw)
{
	while (1)
	{
		unsigned ec = rw->write_ec.u | RW_EC_CONTEND;
		
		/* Try to set write-locked if there are no readers */
		if (!cmpxchg(&rw->l.u, 0, RW_WLOCKED)) return;
		
		/* Wait */
		rw->write_ec.contend = 1;
		sys_futex(&rw->write_ec, FUTEX_WAIT_PRIVATE, ec, NULL, NULL, 0);
		
		/* Other writers may exist */
		rw->write_ec.contend = 1;
		barrier();
	}
}

void rwlock_wunlock(rwlock_ec *rw)
{
	/* Unlock */
	rw->l.wlocked = 0;
	
	/* Wake a writer */
	atomic_add(&rw->write_ec.u, RW_EC_INC);
	if (rw->write_ec.contend)
	{
		rw->write_ec.contend = 0;
		if (sys_futex(&rw->write_ec, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0)) return;
	}
	
	/* Wake all readers */
	atomic_add(&rw->read_ec.u, RW_EC_INC);
	if (rw->read_ec.contend)
	{
		rw->read_ec.contend = 0;
		sys_futex(&rw->read_ec, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
	}
}

void rwlock_rlock(rwlock_ec *rw)
{
	while (1)
	{
		unsigned ec = rw->read_ec.u | RW_EC_CONTEND;
		unsigned state;
		
		/* Make sure event count is read before trying to take lock */
		barrier();
		
		state = atomic_xadd(&rw->l.u, RW_READS);
	
		/* Success? */
		if (!(state & RW_WLOCKED) && !rw->write_ec.contend) return;

		/* Read unlock */
		state = atomic_xadd(&rw->l.u, -RW_READS);
		
		/*
		 * Is it not write locked yet, but a writer is sleeping,
		 * and we are at the readlocked -> writelocked transition.
		 */
		if (state == RW_READS)
		{
			atomic_add(&rw->write_ec.u, RW_EC_INC);
			if (rw->write_ec.contend)
			{
				rw->write_ec.contend = 0;
				
				/* Wake the writer, and then try again */
				sys_futex(&rw->write_ec, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
				continue;
			}
		}
		
		rw->read_ec.contend = 1;
		
		barrier();
		
		/* No need to enter the kernel... quickly test beforehand */
		if (rw->read_ec.u != ec) continue;
		
		/* Wait */
		sys_futex(&rw->read_ec, FUTEX_WAIT_PRIVATE, ec, NULL, NULL, 0);
	}
}

void rwlock_runlock(rwlock_ec *rw)
{
	/* Read unlock */
	unsigned state = atomic_add(&rw->l.u, -RW_READS);
	
	/* Other readers there, don't do anything */
	if (state >> 8) return;
	
	/* We need to wake someone up */
	atomic_add(&rw->write_ec.u, RW_EC_INC);
	if (rw->write_ec.contend)
	{
		rw->write_ec.contend = 0;
		sys_futex(&rw->write_ec, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
	}
}

The reader-preferring version is again slightly simpler:


/* Sleeping Read-Write Lock using event counts (reader preferring) */
#define RW_WLOCKED		(1ULL<<0)
#define RW_READS		(1ULL<<8)

#define RW_EC_CONTEND	(1ULL<<0)
#define RW_EC_INC		(1ULL<<8)

void rwlock_wlock(rwlock_ec *rw)
{
	while (1)
	{
		unsigned ec = rw->write_ec.u;
		
		/* Try to set write-locked if there are no readers */
		if (!cmpxchg(&rw->l.u, 0, RW_WLOCKED)) return;
		
		rw->write_ec.contend = 1;
		ec |= RW_EC_CONTEND;
		
		sys_futex(&rw->write_ec, FUTEX_WAIT_PRIVATE, ec, NULL, NULL, 0);
		
		/* Other writers may exist */
		rw->write_ec.contend = 1;
		barrier();
	}
}

void rwlock_wunlock(rwlock_ec *rw)
{
	/* Unlock */
	rw->l.wlocked = 0;
	
	/* Wake all readers */
	atomic_add(&rw->read_ec.u, RW_EC_INC);
	if (rw->read_ec.contend)
	{
		rw->read_ec.contend = 0;
		if (sys_futex(&rw->read_ec, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0)) return;
	}
	
	/* Wake a writer */
	atomic_add(&rw->write_ec.u, RW_EC_INC);
	if (rw->write_ec.contend)
	{
		rw->write_ec.contend = 0;
		sys_futex(&rw->write_ec, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
	}
}

void rwlock_rlock(rwlock_ec *rw)
{
	unsigned ec = rw->read_ec.u;
	unsigned state;
	
	/* Make sure event count is read before trying to take lock */
	barrier();
		
	state = atomic_xadd(&rw->l.u, RW_READS);
	
	while (state & RW_WLOCKED)
	{
		ec |= RW_EC_CONTEND;
		
		/* Sleep until no longer held by a writer */
		rw->read_ec.contend = 1;
		sys_futex(&rw->read_ec, FUTEX_WAIT_PRIVATE, ec, NULL, NULL, 0);
				
		ec = rw->read_ec.u;
		
		barrier();
		
		state = rw->l.u;
	}
}

void rwlock_runlock(rwlock_ec *rw)
{
	/* Read unlock */
	unsigned state = atomic_add(&rw->l.u, -RW_READS);
	
	/* Other readers there, don't do anything */
	if (state) return;
	
	/* We need to wake someone up */
	atomic_add(&rw->write_ec.u, RW_EC_INC);
	if (rw->write_ec.contend)
	{
		rw->write_ec.contend = 0;
		sys_futex(&rw->write_ec, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
	}
}

However, for optimal speed, we need to use assembly language. The optimized writer-preferring version then is:


    .globl rwlock_wlock
    .type rwlock_wlock, @function
    .align 16
	
    rwlock_wlock:
1:  mov    $0x1, %ecx
    xor    %eax, %eax
    mov    0x4(%rdi), %edx

    # Try to set write-locked
    lock cmpxchg %ecx, (%rdi)
    jne    2f
    rep; retq
	
    # Failed, go to sleep
2:  or     $0x1, %dl
    mov    $SYS_futex, %eax
    xor    %r10, %r10
    mov    $FUTEX_WAIT_PRIVATE, %esi
    movb   $0x1, 0x4(%rdi)
    add    $0x4, %rdi
    syscall
    movb   $0x1, (%rdi)
    sub    $0x4, %rdi
    jmp    1b
    .size rwlock_wlock, .-rwlock_wlock


    .globl rwlock_wunlock
    .type rwlock_wunlock, @function
    .align 16
	
rwlock_wunlock:
    mov    $0x100, %eax
    add    $0x4, %rdi
    movb   $0x0, -0x4(%rdi)
	
    # Any writers sleeping?
    lock addl %eax, (%rdi)
    cmp    $0x0, (%rdi)
    jne    2f
	
    # Any readers sleeping?
    lock xadd %eax, 0x4(%rdi)
    test   %al, %al
    jne    3f
1:  rep; retq
	
    # Wake a writer
2:  mov    $SYS_futex, %eax
    mov    $FUTEX_WAKE_PRIVATE, %esi
    mov    $0x1, %edx
    movb   $0x0, (%rdi)
    syscall
    test   %eax, %eax
    jne    1b
	
    # Any readers sleeping?
    mov    $0x100, %eax
    lock xadd %eax, 0x4(%rdi)
    test   %al, %al
    je     1b

    # Wake all readers
3:  add    $0x4, %rdi
    mov    $SYS_futex, %eax
    mov    $FUTEX_WAKE_PRIVATE, %esi
    mov    $0x7fffffff, %edx
    movb   $0x0, (%rdi)
    syscall
    retq
	
    .size rwlock_wunlock, .-rwlock_wunlock


    .globl rwlock_rlock
    .type rwlock_rlock, @function
    .align 16

rwlock_rlock:
    add    $0x8, %rdi
1:  mov    $0x100, %eax
    mov    (%rdi), %edx
    mov    %eax, %ecx
	
    # Attempt to grab lock
    lock xadd %eax, -0x8(%rdi)
    or     -0x4(%rdi), %al
    jne    2f
    rep; retq
	
    # Failed, unlock
2:  mov    $SYS_futex, %eax
    lock sub %ecx, -0x8(%rdi)
    je     4f
	
    # Sleep on read eventcount
3:  or     $0x1, %dl
    mov    $FUTEX_WAIT_PRIVATE, %esi
    xor    %r10, %r10
    movb   $0x1, (%rdi)
    syscall
    jmp    1b
	
    # Test to see if any writers are sleeping
4:  lock add %ecx, -0x4(%rdi)
    cmpb   $0x0, -0x4(%rdi)
    je     3b
	
    # Wake a writer, and then try again
    sub    $0x4, %rdi
    mov    $FUTEX_WAKE_PRIVATE, %esi
    mov    %rdx, %r8
    movb   $0x0, (%rdi)
    mov    $0x1, %edx
    syscall
    add    $0x4, %rdi
    mov    %r8, %rdx
    jmp    3b	
    .size rwlock_rlock, .-rwlock_rlock


    .globl rwlock_runlock
    .type rwlock_runlock,@function
    .align 16

rwlock_runlock:
    # One less reader
    mov    $0x100, %eax
    lock sub %eax, (%rdi)
    je     1f
    rep; retq
	
    # Do we need to wake someone up?
1:  lock xaddl %eax, 0x4(%rdi)
    test   %al, %al
    jne    2f
    rep; retq
	
    # Wake via the write eventcount
2:  add    $0x4, %rdi
    mov    $SYS_futex, %eax
    mov    $FUTEX_WAKE_PRIVATE, %esi
    mov    $0x1, %edx
    movb   $0x0, (%rdi)
    syscall
    retq
    .size rwlock_runlock, .-rwlock_runlock

This code gives the following timings

Writer-Preferring Eventcount Lock
Writers per 2560125128250
Time (s)2.12.64.77.38.5

These results are slightly faster than the bitset locks for larger reader fractions, but still horrible compared to the pthreads code. It addition, they are much worse for large writer fractions.

The reader-preferring variant looks like:


    .globl rwlock_wlock
    .type rwlock_wlock, @function
    .align 16
	
rwlock_wlock:
1:  mov    $0x1, %ecx
    xor    %eax, %eax
    mov    0x4(%rdi), %edx
	
    # Try to set to write-locked
    lock cmpxchg %ecx, (%rdi)
    jne    2f
    rep; retq
	
    # Failed, sleep until woken via an eventcount
2:  or     $0x1, %dl
    mov    $SYS_futex, %eax
    xor    %r10, %r10
    mov    $FUTEX_WAIT_PRIVATE, %esi
    movb   $0x1, 0x4(%rdi)
    add    $0x4, %rdi
    syscall
    movb   $0x1, (%rdi)
    sub    $0x4, %rdi
    jmp    1b
    .size rwlock_wlock, .-rwlock_wlock


    .globl rwlock_wunlock
    .type rwlock_wunlock, @function
    .align 16

rwlock_wunlock:
    mov    $0x100, %eax
    movb   $0x0, (%rdi)
	
    # Are there any sleeping readers?
    lock add %eax, 0x8(%rdi)
    cmpb    $0x0, 0x8(%rdi)
    jne    2f
	
    # Are there any sleeping writers?
    lock xadd %eax, 0x4(%rdi)
    test   %al, %al
    jne    3f
1:  rep; retq
	
    # Wake all readers
2:  movb   $0x0, 0x8(%rdi)
    mov    $0x7fffffff, %rdx
    mov    $SYS_futex, %eax
    mov    $FUTEX_WAKE_PRIVATE, %esi
    add    $0x8, %rdi
    syscall
    test   %eax, %eax
    jne    1b

    # Any sleeping writers?
    lock addl $0x100, -0x4(%rdi)
    cmpb    $0x0, -0x4(%rdi)
    je     1b
    sub    $0x8, %rdi
	
    # Wake a writer
3:  movb   $0x0, 0x4(%rdi)
    mov    $0x1, %edx
    add    $0x4, %rdi
    mov    $SYS_futex, %eax
    mov    $FUTEX_WAKE_PRIVATE, %esi
    syscall
    retq
    .size rwlock_wunlock, .-rwlock_wunlock

    .globl rwlock_rlock
    .type rwlock_rlock, @function
    .align 16
	
rwlock_rlock:
    mov    $0x100, %eax
    mov    0x8(%rdi), %edx
	
    # Attempt to take the lock
    lock xadd %eax, (%rdi)
    test   %al, %al
    jne    1f
    rep; retq
	
    # Failure, sleep until not held by a writer
1:  xor    %r10, %r10
    mov    $FUTEX_WAIT_PRIVATE, %esi
    add    $0x8, %rdi

2:  or     $0x1, %dl
    mov    $SYS_futex, %eax
    mov    %dl, (%rdi)
    syscall
    mov    (%rdi), %edx
    cmpb   $0x1, -0x8(%rdi)
    je     2b
    rep; retq
    .size rwlock_rlock, .-rwlock_rlock


    .globl rwlock_runlock
    .type rwlock_runlock,@function
    .align 16
	
rwlock_runlock:
    # One less reader
    mov    $0x100, %eax
    lock sub %eax, (%rdi)
    je     1f
    rep; retq
	
    # Do we need to wake someone up?
1:  lock xaddl %eax, 0x4(%rdi)
    test   %al, %al
    jne    2f
    rep; retq
	
    # Wake a writer
2:  add    $0x4, %rdi
    mov    $SYS_futex, %eax
    mov    $FUTEX_WAKE_PRIVATE, %esi
    mov    $0x1, %edx
    movb   $0x0, (%rdi)
    syscall
    retq
    .size rwlock_runlock, .-rwlock_runlock

Which gives the following benchmark results

Writer-Preferring Eventcount Lock
Writers per 2560125128250
Time (s)2.12.94.65.76.3

Which are slightly worse for the reader-dominated cases, but much better in the writer-dominated ones. These still aren't nearly as good as the Glibc results.

Fair Read-Write Locks

The above locks are either reader-preferring or writer-preferring. However, there is a third type of read-write lock, one that prefers neither. Such a "Fair Lock" maintains an order such that neither readers nor writers can be starved. This can improve throughput since forward progress is guaranteed. The simplest way to create such a lock uses two separate mutexes. One mutex maintains the ordering, and the second controls the mutual exclusivity between readers and writers:


/* Fair rwlock using two mutexes */

typedef struct fair_rwlock fair_rwlock;
struct fair_rwlock
{
	mutex m;
	mutex writer;
	unsigned read_count;
};

void rwlock_wlock(fair_rwlock *rw)
{
	/* Use mutex m to communicate with read-lock code */
	mutex_lock(&rw->m);
	mutex_lock(&rw->writer);
	mutex_unlock(&rw->m);
}

void rwlock_wunlock(fair_rwlock *rw)
{
	mutex_unlock(&rw->writer);
}

void rwlock_rlock(fair_rwlock *rw)
{
	/* The first reader also locks the write mutex */
	mutex_lock(&rw->m);
	if (!atomic_xadd(&rw->read_count, 1)) mutex_lock(&rw->writer);
	mutex_unlock(&rw->m);
}

void rwlock_runlock(fair_rwlock *rw)
{
	/* When done, unlock the write mutex */
	if (atomic_xadd(&rw->read_count, -1) == 1) mutex_unlock(&rw->writer);
}

Notice how in the above, most of the complexity has been hidden within the mutex implementation. We will use a low-level lock the size of a Futex (integer) for this:


    .globl ll_lock
    .type ll_lock,@function
    .align 16
	
ll_lock:
    xor     %ecx, %ecx

    # Spin a bit to try to get lock
1:  mov     $1, %al
    xchgb   (%rdi), %al
    test    %al, %al
    jz      3f
    pause
    add     $-1, %cl
    jnz     1b
	
    # Set up syscall details
    mov     $0x0101, %edx
    mov     $FUTEX_WAIT_PRIVATE, %esi
    xor     %r10, %r10
	
    # Wait loop
2:  mov     $SYS_futex, %eax
    syscall
    mov     %edx, %eax
    xchgl   (%rdi), %eax
    test    $1, %eax
    jnz     2b
3:  rep; ret
    .size   ll_lock, .-ll_lock

    .globl ll_unlock
    .type ll_unlock,@function
    .align 16
	
ll_unlock:
    xor     %ecx, %ecx
    cmpl    $1, (%rdi)
    jne     1f
    mov     $1, %eax
    # Fast path... no one is sleeping
    lock; cmpxchgl %ecx, (%rdi)
    jz      3f
1:  movb    $0, (%rdi) 

    # Spin, and hope someone takes the lock
2:  testb   $1, (%rdi)
    jnz     3f
    pause
    add     $-1, %cl
    jnz     2b

    # Wake up someone
    movb    $0, 1(%rdi)
    mov     $SYS_futex, %eax
    mov     $FUTEX_WAKE_PRIVATE, %esi
    mov     $1, %edx
    syscall
3:  ret
    .size   ll_unlock, .-ll_unlock

Using the above primitives, we can construct the asm versions of the mutex based fair read-write lock. The main trick here is that the asm routines do not mess with %rdi which points to the address of the mutex. This means that the calling routine does not have to save and restore this variable, saving the construction of a stack frame.


    .globl rwlock_wlock
    .type rwlock_wlock,@function
    .align 16
	
rwlock_wlock:
    call    ll_lock
    add     $0x4, %rdi
    call    ll_lock
    sub     $0x4, %rdi
    jmp     ll_unlock
    .size rwlock_wlock, .-rwlock_wlock
	
	
    .globl rwlock_wunlock
    .type rwlock_wunlock,@function
    .align 16
	
rwlock_wunlock:
    add     $0x4, %rdi
    jmp     ll_unlock
    .size rwlock_wunlock, .-rwlock_wunlock

	
    .globl rwlock_rlock
    .type rwlock_rlock,@function
    .align 16
	
rwlock_rlock:
    call    ll_lock
    mov     $1, %eax
	
    # Are we the last reader?
    lock xadd %eax, 0x8(%rdi)
    test    %eax, %eax
    jnz     ll_unlock
    add     $0x4, %rdi
    call    ll_lock
    sub     $0x4, %rdi
    jmp     ll_unlock
    .size rwlock_rlock, .-rwlock_rlock


    .globl rwlock_runlock
    .type rwlock_runlock,@function
    .align 16
	
rwlock_runlock:
    lock addl $-1, 0x8(%rdi)
    jz      rwlock_wunlock
    rep; ret
    .size rwlock_runlock, .-rwlock_runlock

The benchmark results of the mutex-based fair lock are:

Mutex Fair Lock
Writers per 2560125128250
Time (s)1.11.32.66.06.1

This lock is amazingly fast, beating or equalling the Posix locks in all situations. However, some of the other lock implementations above do beat it when the number of readers is the same as writers. Perhaps we can improve speed some more by noticing that the above algorithm doesn't use the fact that multiple readers may be awoken simultaneously when a readers gains control of a lock. The simplest way of arranging this using "standard" threading primitives is to use a condition variable. By using a broadcast to the condition variable, multiple threads can be awoken at once. C code that does this looks like:


typedef struct fair_rwlock fair_rwlock;
struct fair_rwlock
{
	mutex m;
	
	mutex waitlist;
	
	cv read_wait;
	
	unsigned read_count;
	unsigned write_count;
	char read_flag;
};


void rwlock_wlock(fair_rwlock *rw)
{
	/* Use mutex m to synchronize the lock state with the read path */
	mutex_lock(&rw->m);
	rw->write_count++;
	mutex_unlock(&rw->m);
	
	mutex_lock(&rw->waitlist);
}

void rwlock_wunlock(fair_rwlock *rw)
{
	/* Use mutex m to synchronize the lock state with the read path */
	mutex_lock(&rw->m);
	rw->write_count--;
	mutex_unlock(&rw->m);
	
	mutex_unlock(&rw->waitlist);
}

void rwlock_rlock(fair_rwlock *rw)
{
	mutex_lock(&rw->m);
	
	while (1)
	{
		/* Fast path, only readers */
		if (!rw->write_count && rw->read_count)
		{
			rw->read_count++;
			mutex_unlock(&rw->m);

			return;
		}

		/* Any other waiting readers? */
		if (!rw->read_flag) break;
		
		/* Have a waiting reader, append to wait-list */
		cond_wait(&rw->read_wait, &rw->m);
		
		/* If other readers are there, then we can read-lock. */
		if (rw->read_count)
		{
			rw->read_count++;
			mutex_unlock(&rw->m);
			return;
		}
	}

	/* Need to wait until writers are done */
	rw->read_flag = 1;
	mutex_unlock(&rw->m);
	mutex_lock(&rw->waitlist);
	mutex_lock(&rw->m);
	
	/* Wake other waiting readers, and lock the write-lock */
	rw->read_flag = 0;
	rw->read_count++;
	cond_broadcast(&rw->read_wait);
	mutex_unlock(&rw->m);
}

void rwlock_runlock(fair_rwlock *rw)
{
	mutex_lock(&rw->m);
	rw->read_count--;
	if (!rw->read_count)
	{
		/* The last reader unlocks the write-lock */
		mutex_unlock(&rw->waitlist);
	}
	mutex_unlock(&rw->m);
}

And similarly, in assembly language:


    .globl rwlock_wlock
    .type rwlock_wlock,@function
    .align 16
	
rwlock_wlock:
    call    ll_lock
    addl    $1, 0x10(%rdi)
    call    ll_unlock
	
    add     $0x4, %rdi
    jmp     ll_lock
    .size rwlock_wlock, .-rwlock_wlock
	
	
.globl rwlock_wunlock
    .type rwlock_wunlock,@function
    .align 16
	
rwlock_wunlock:
    call    ll_lock
    subl    $1, 0x10(%rdi)
    call    ll_unlock
	
    add     $0x4, %rdi
    jmp     ll_unlock
    .size rwlock_wunlock, .-rwlock_wunlock

	
.globl rwlock_rlock
    .type rwlock_rlock,@function
    .align 16

rwlock_rlock:
    call    ll_lock
	
1:  cmpl    $0, 0x10(%rdi)
    jne     2f
    cmpl    $0, 0xc(%rdi)
    je      2f
	
    addl    $1, 0xc(%rdi)
    jmp     ll_unlock

2:  cmpb    $0, 0x14(%rdi)
    je      3f
	
    # Save seq for condvar
    movl    0x8(%rdi), %r8d

    # Unlock mutex
    call    ll_unlock
    add     $0x8, %rdi

    # Wait on condvar
    mov     %r8d, %edx
    mov     $SYS_futex, %eax
    xor     %r10, %r10
    mov     $FUTEX_WAIT_PRIVATE, %esi
    movb    $1, (%rdi)
    syscall

    # Lock the mutex again
    sub     $0x8, %rdi
    call    ll_lock

    #  Mark as contended
    movb    $1, 0x1(%rdi)

    cmpl    $0, 0xc(%rdi)
    je      1b
    addl    $1, 0xc(%rdi)
    jmp     ll_unlock

    # We are the first reader - write lock it
3:  movb    $1, 0x14(%rdi)
    call    ll_unlock
    add     $0x4, %rdi
    call    ll_lock
    sub     $0x4, %rdi
    call    ll_lock
	
    # Convert to a read lock
    movb    $0, 0x14(%rdi)
    mov     $0x100, %eax
    addl    $1, 0xc(%rdi)
	
    # Any readers waiting?
    lock xadd %eax, 0x8(%rdi)
    test    $1, %al
    jz      ll_unlock
	
    # Wake them
    mov     %rdi, %r8
    add     $0x8, %rdi
    mov     $SYS_futex, %eax
    mov     $FUTEX_REQUEUE_PRIVATE, %esi
    mov     $1, %edx
    mov     $0x7fffffff, %r10
    syscall
    sub     $0x8, %rdi
    jmp ll_unlock
    .size rwlock_rlock, .-rwlock_rlock


    .globl rwlock_runlock
    .type rwlock_runlock,@function
    .align 16
	
rwlock_runlock:
    call    ll_lock
    subl    $1, 0xc(%rdi)
    jnz     ll_unlock
    call    ll_unlock
    add     $0x4, %rdi
    jmp     ll_unlock
    .size rwlock_runlock, .-rwlock_runlock

This algorithm benchmarks as:

Mutex + CV Fair Lock
Writers per 2560125128250
Time (s)1.64.27.86.95.4

This performs horribly for medium numbers of readers, but very good for large number of writers. However, again, such a situation would usually be solved through the use of a simple mutex rather than a read-write lock. The reason for the poor performance is that the condition variable at the lowest level doesn't perform in the way we would wish. Instead of waking all sleeping readers, it wakes one, and re-queues the rest on the mutex. We can fix this by replacing the condvar with an eventcount. Similarly, we can merge the waiter count with the mutex, and read flag with the reader count, shrinking the lock data structure size down quite a bit. Doing this yields the rather complex:


typedef struct fair_rwlock fair_rwlock;
struct fair_rwlock
{
	union
	{
		unsigned long long l;
		unsigned waiters;
		unsigned char locked;
	} lock;
	
	union
	{
		unsigned seq;
		unsigned char contend;
	} read_wait;
};

void rwlock_wlock(fair_rwlock *rw)
{
	int i;
	
	/* There is a waiting writer */
	atomic_add(&rw->lock.waiters, 256);
	
	/* Spin for a bit to try to get the lock */
	for (i = 0; i < 100; i++)
	{
		if (!(xchg_8(&rw->lock.locked, 1) & 1)) return;

		cpu_relax();
	}
	
	/* Failed, so we need to sleep */
	while (xchg_8(&rw->lock.locked, 1) & 1)
	{
		sys_futex(&rw->lock, FUTEX_WAIT_PRIVATE, rw->lock.waiters | 1, NULL, NULL, 0);
	}
}

void rwlock_wunlock(fair_rwlock *rw)
{
	/* One less writer and also unlock simultaneously */
	if (atomic_add(&rw->lock.waiters, -257))
	{
		int i;
		
		/* Spin for a bit hoping someone will take the lock */
		for (i = 0; i < 200; i++)
		{
			if (rw->lock.locked) return;

			cpu_relax();
		}
		
		/* Failed, we need to wake someone */
		sys_futex(&rw->lock, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
	}
}

void rwlock_rlock(fair_rwlock *rw)
{
	while (1)
	{
		unsigned long long val;
		unsigned seq = rw->read_wait.seq;
		
		barrier();
		
		val = rw->lock.l;
		
		/* Fast path, no readers or writers */
		if (!val)
		{
			if (!cmpxchg(&rw->lock.l, 0, 1 + 2 + (1ULL << 33))) return;
			
			/* Failed, try again */
			cpu_relax();
			continue;
		}
	
		/* Fast path, only readers */
		if ((((unsigned) val) == 1 + 2) && (val >> 33))
		{
			if (cmpxchg(&rw->lock.l, val, val + (1ULL << 33)) == val) return;
			
			/* Failed, try again */
			cpu_relax();
			continue;
		}
		
		/* read flag set? */
		if (val & (1ULL << 32))
		{
			/* Have a waiting reader, append to wait-list */
			rw->read_wait.contend = 1;

			sys_futex(&rw->read_wait, FUTEX_WAIT_PRIVATE, seq | 1, NULL, NULL, 0);
			
			val = rw->lock.l;
			
			/* If there are readers, we can take the lock */
			if (((val & 3) == 3) && (val >> 33))
			{
				/* Grab a read lock so long as we are allowed to */
				if (cmpxchg(&rw->lock.l, val, val + (1ULL << 33)) == val) return;
			}
			
			/* Too slow, try again */
			cpu_relax();
			continue;
		}
		
		/* Try to set read flag + waiting */
		if (cmpxchg(&rw->lock.l, val, val + 256 + (1ULL << 32)) == val) break;
		
		cpu_relax();
	}
	
	/* Grab write lock */
	while (xchg_8(&rw->lock.locked, 1) & 1)
	{
		sys_futex(&rw->lock, FUTEX_WAIT_PRIVATE, rw->lock.waiters | 1, NULL, NULL, 0);
	}
	
	/* Convert write lock into read lock. Use 2-bit to denote that readers can enter */
	atomic_add(&rw->lock.l, 2 - 256 - (1ULL << 32) + (1ULL << 33));
	
	/* We are waking everyone up */
	if (atomic_xadd(&rw->read_wait.seq, 256) & 1)
	{
		rw->read_wait.contend = 0;

		/* Wake one thread, and re-queue the rest on the mutex */
		sys_futex(&rw->read_wait, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
	}
}

void rwlock_runlock(fair_rwlock *rw)
{
	/* Unlock... and test to see if we are the last reader */
	if (!(atomic_add(&rw->lock.l, -(1ULL << 33)) >> 33))
	{
		rw->lock.locked = 0;

		barrier();
		
		/* We need to wake someone up? */
		if (rw->lock.waiters)
		{
			sys_futex(&rw->lock, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
		}
	}
}

Or, in asm:


    .globl rwlock_wlock
    .type rwlock_wlock,@function
    .align 16
	
rwlock_wlock:
    # There is now a waiting writer
    lock addl $0x100, (%rdi)
	
    # Try to write-lock
1:  mov    $0x1, %al
    xchg   %al, (%rdi)
    test   $0x1, %al
    jnz    2f
    rep; retq
 	
    # Wait until we can try to take the lock
2:  mov    (%rdi), %edx
    mov    $SYS_futex, %eax
    or     $1, %dl
    xor    %r10, %r10
    mov    $FUTEX_WAIT_PRIVATE, %esi
    syscall
    jmp    1b
    .size rwlock_wlock, .-rwlock_wlock


    .globl rwlock_wunlock
    .type rwlock_wunlock,@function
    .align 16

rwlock_wunlock:
    # One less writer + unlock simultaneously
    mov    $0xfffffeff, %eax
    lock xadd %eax, (%rdi)
    cmp    $0x101, %eax
    jne    1f
    rep; retq
	
    # We need to wake someone
1:  mov    $SYS_futex, %eax
    mov    $FUTEX_WAKE_PRIVATE, %esi
    mov    $0x1, %edx
    syscall
    retq
    .size rwlock_wunlock, .-rwlock_wunlock


    .globl rwlock_rlock
    .type rwlock_runlock,@function
    .align 16

runlock_c1:
.quad 0x200000003

runlock_c2:
.quad 0x200000000

runlock_c3:
.quad 0x100000100

rwlock_rlock:
    mov    (%rdi), %rax
    jmp    2f
1:  pause 
2:  test   %rax, %rax
    jnz    3f
	
    # Fast path - no one has the lock, so try to grab it
    mov    runlock_c1(%rip), %rcx
    lock cmpxchg %rcx, (%rdi)
    jnz    3f
    rep; retq
	
    # Are there other readers but no writers?
3:  mov    %rax, %rcx
    cmp    $0x3, %eax
    jne    5f
    shr    $0x21, %rcx
    je     5f
4:  mov    %rax, %rcx
    add    runlock_c2(%rip), %rcx
    lock cmpxchg %rcx, (%rdi)
    jnz    1b
    rep; retq
	
    # Is there a waiting reader?
5:  mov    8(%rdi), %edx
    testb  $0x1, 4(%rdi)
    je     6f
	
    # Wait until that reader is woken
    add    $0x8, %rdi
    orb    $0x1, %dl
    xor    %r10, %r10
    mov    $FUTEX_WAIT_PRIVATE, %esi
    mov    $SYS_futex, %eax
    movb   $0x1, (%rdi)
    syscall
	
    # Try to grab the lock once we are woken
    mov    -0x8(%rdi), %rax
    sub    $0x8, %rdi
    cmp    $0x3, %al
    jne    1b
    mov    %rax, %rcx
    shr    $0x21, %rcx
    je     1b
    jmp    4b

    # Try to be the first reader
6:  mov    %rax, %rcx
    add    runlock_c3(%rip), %rcx
    lock cmpxchg %rcx, (%rdi)
    jnz    1b
	
    # We are the first reader to wait
    xor    %r10, %r10
    mov    $FUTEX_WAIT_PRIVATE, %esi

    # Try to get the write lock
7:  mov    $0x1, %al
    xchg   %al, (%rdi)
    test   $0x1, %al
    jz     8f
	
    # Failed, sleep until we are next in line
    mov    (%rdi), %edx
    mov    $SYS_futex, %eax
    or     $1, %dl
    syscall
    jmp    7b
	
    # Convert into a read lock
8:  mov    $0xffffff02, %eax
    lock add %rax, (%rdi)
    mov    $0x100, %eax
	
    # Are there any sleeping readers?
    lock xadd %eax, 0x8(%rdi)
    test   $0x1, %al
    jz     9f
	
    # Wake them
    add    $0x8, %rdi
    mov    $SYS_futex, %eax
    mov    $FUTEX_WAKE_PRIVATE, %esi
    mov    $0x7fffffff, %edx
    movb   $0x0, (%rdi)
    syscall
9:  retq   
    .size rwlock_rlock, .-rwlock_rlock

    .globl rwlock_runlock
    .type rwlock_runlock,@function
    .align 16
	
rwlock_runlock:
    mov    $-2, %eax
    mov    %eax, %edx
	
    # Unlock, and test to see if we are the last reader
    lock xadd %eax, 4(%rdi)
    add    %edx, %eax
    and    %edx, %eax
    jz     1f
    rep; retq
	
    # Is there anyone sleeping?
1:  movb   $0, (%rdi)
    cmp    %eax, (%rdi)
    je     2f
	
    # Wake someone
    mov    $SYS_futex, %eax
    mov    $FUTEX_WAKE_PRIVATE, %esi
    mov    $0x1, %edx
    syscall
2:  retq
    .size rwlock_runlock, .-rwlock_runlock

Yielding the results:

Event Count Fair Lock
Writers per 2560125128250
Time (s)1.11.66.99.37.7

This is much improved compared to the condition variable based lock, being comparable to the writer-preferring Posix lock. However, it is quite slow compared to the mutex-based fair lock. Unfortunately, there is a race on reader-thread wakeup. A waking reader may miss the window where it can take the lock, in which case it goes back to sleep. This causes a loss of performance. One might hope that making the newly-wakened readers spin with a cmpxchg loop would help. However, that produces even worse benchmark results.

Summary

The best performing read-write locks seem to be the simplest. Using complex algorithms with many atomic instructions seems to result in worse performance. Even though the Posix read-write locks in Glibc are implemented via a mutex, they perform extremely well. However, it is still possible to do better. By using a mutex-based fair read-write lock, one can gain extra speed. That particular lock is about twice as fast as the default reader-preferring lock when there are many readers, and around twice as fast as writer-preferring lock when the number of readers and writers are similar. In other situations, it performs as well as the competition.

It may be possible to improve the fair lock by allowing multiple readers to awaken simultaneously. However, doing so is non-trivial. The reader which wakes the others may finish with the critical section before the wakeups are complete. This causes a race which is hard to plug without extra locking that slows things down more than what is gained. The other possibility, of allowing such newly awakened readers to fail to grab the lock causes the fairness guarantees to be weakened. This again results in the loss of performance due to starvation.

Comments

Samy Al Bahra said...
http://repnop.org/ck/doc/ck_bytelock.html has a good fast path for readers.
Borislav Trifonov said...
Would it be possible to increase the isolation between readers by having them use separate mutexes and counters, shifting more work to the writer which would have to check all? Joe Duffy got an improvement for doing this with spinning RW locks and 16*CPUcount slots: http://www.bluebytesoftware.com/blog/2009/02/21/AMoreScalableReaderwriterLockAndABitLessHarshConsiderationOfTheIdea.aspx (at time of this post the link is down but Google's cache at http://webcache.googleusercontent.com/search?q=cache:GP1RCH1S5gcJ:www.bluebytesoftware.com/blog/2009/02/21/AMoreScalableReaderwriterLockAndABitLessHarshConsiderationOfTheIdea.aspx+/AMoreScalableReaderwriterLockAndABitLessHarshConsiderationOfTheIdea&cd=1&hl=en&ct=clnk&gl=ca&client=firefox-a&source=www.google.ca works).
Bryan LaPorte said...
Actually, for the event counting code, there is a bug in the assembly for the write-preferring rwlock_wunlock which I believe is skewing your results by causing many spurious wake-ups.

The check for writer contention should be a "cmpb" and not "cmp". As it is, we'll make the wake system call for exclusive waiters every time we do a write unlock. I reconstructed the test scenario described here and found that the corrected code is very competitive with the native rwlocks.

Really the worst issue is with the interface itself. The acquisition and release semantics should be exposed to the consumer of the lock, which has never been done entirely (if I recall, the Windows kernel has a shared-acquire-starve-exclusive call which is a step in this direction).

I also flipped over and ran the scenario on Windows SRW locks. In all but the uncontentested case (predictably, but sadly), they basically get toasted.

I dug into the SRW disassembly and found that unsurprisingly they're style over substance, with what appear to be some pretty silly things going on. I'd be happy to type something up on they're design and send it on, if you think your readers would be interested.
Bryan LaPorte said...
"Their", not "they're" in that last sentence. Hate typos.

@Borislav: I read through the article you posted and I'm not sure what you're asking about. Do you mean can a lock be constructed which separates the contention between readers and writers better?

The lock that Joe Duffy is describing is a C# algorithm being compared to other theoretical C# algorithms. The locks described above will more than likely beat it hands down in any scenario (the performance difference between C# and hand-optimized assembly is typically vast).
Alex said...
In the glibc's version of rwlock_rdlock(), shouldn't the waiting condition be instead:

 /* Wait until there are no writers */
-while (l->wwait || (l->wrlock && l->writer_preferred))
+while (l->wrlock || (l->wwait && l->writer_preferred))
 {
 l->rwait++;
 ...

Otherwise we cannot recursively rdlock as required by POSIX.
Borislav Trifonov said...
How would one go about implementing trylock() and timedlock() for the fair version of the R/W mutex, assuming the underlying mutex type supports these? It seems to me that for the timed version, an absolute timeout is needed where both the m and writer mutexes are part of the timed wait, because the m could be contended. Here's my attempt, but I'm not sure it's right:

bool rwlock_wtrylock(fair_rwlock *rw)
{
        if (!mutex_trylock(&rw->m)) return false;
        if (mutex_trylock(&rw->writer))
        {
                mutex_unlock(&rw->m);
                return true;
        }
        mutex_unlock(&rw->m);
        return false;
}

bool rwlock_wtrylockuntil(fair_rwlock *rw, unsigned int tEnd)
{
        if (!mutex_trylockuntil(&rw->m, tEnd)) return false;
        if (mutex_trylockuntil(&rw->writer, tEnd))
        {
                mutex_unlock(&rw->m);
                return true;
        }
        mutex_unlock(&rw->m);
        return false;
}

bool rwlock_rtrylock(fair_rwlock *rw)
{
        if (!mutex_trylock(&rw->m)) return false;
        if (atomic_xadd(&rw->read_count, 1) || mutex_trylock(&rw->writer))
        {
                mutex_unlock(&rw->m);
                return true;
        }
        mutex_unlock(&rw->m);
        return false;
}

bool rwlock_rtrylockuntil(fair_rwlock *rw, unsigned int tEnd)
{
        if (!mutex_trylockuntil(&rw->m, tEnd)) return false;
        if (atomic_xadd(&rw->read_count, 1) || mutex_trylockuntil(&rw->writer, tEnd))
        {
                mutex_unlock(&rw->m);
                return true;
        }
        mutex_unlock(&rw->m);
        return false;
}
said...
Enter your comments here
said...
Enter your comments here
said...
Enter your comments here

Enter the 10 characters above here


Name
About Us Returns Policy Privacy Policy Send us Feedback
Company Info | Product Index | Category Index | Help | Terms of Use
Copyright © Lockless Inc All Rights Reserved.