Lockless Inc

Phasers

Phasers are a generalization of the Barrier synchronization primitive. Barriers synchronize a given set of threads. Once all the threads have reached the barrier, it lets them continue. Until then, threads must wait until their tardy compatriots arrive. This is obviously inefficient. Threads could perhaps be doing other work whilst they are waiting. This is the key difference with phasers, where the act of "arriving" and "waiting" are separated.

The key operations for a phaser are "signaling" (or arriving), and "waiting". A thread signals a phaser to let it know that it has been reached. In return, a phaser gives that thread a handle (a sequence number). The thread can later use that sequence number to test to see if all other threads have reached the phaser. It can either poll the phaser, or sleep on it using that handle. Another possibility is to have an interface function that does both, in which case the phaser behaves exactly like a barrier would.

A phaser can thus be made by splitting a barrier in two. Instead of having one wait function, we will now have a signal and a wait function. We will use the fast pool barrier as the skeleton for this. Abstracting away the internals of the data structure gives:


/* Signal a phaser, and then return a sequence number handle */
unsigned phaser_sig(phaser_t *p)
{
	unsigned seq, total, count;

	ref(p);

	add_count(p, &total, &count, &seq);

	if (count == total) wake_threads(p);

	/* Use lowest bit to say if we are allowed to immediately go */
	return (seq & ~1U) | (count <= total);
}

/* Returns true if the sequence number has changed */
int phaser_test(phaser_t *p, unsigned seq)
{
	return ((seq | 1) != (atomic_read(p->seq) | 1));
}

/* Wait until we can go */
void phaser_wait(phaser_t *p, unsigned seq)
{
	spin_threads(p, seq);

	/* Can we immediately go? */
	if (!(seq & 1))
	{
		while (1)
		{
			unsigned seq, total, count;

			add_count(p, &total, &count, &seq);

			if (count == total)
			{
				wake_threads(p);

				break;
			}

			spin_threads(p, seq);

			/* Can we go? */
			if (count < total) break;
		}
	}

	deref(p);
}

Where the code to wake all the sleeping threads is:


static void wake_threads(phaser_t *p)
{
	/* Simultaneously clear count, increment sequence number, and clear wait flag */
	unsigned seq = atomic_read(p->seq);

	if (xchg_64((unsigned long long *) &p->count, ((seq | 1ULL) + 255) << 32) & 1ULL << 32)
	{
		/* Wake up sleeping threads */
		sys_futex(&p->seq, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
	}
}

There is some subtlety to the above. We need to correctly handle the case where more threads than planned reach the phaser before it ticks to the next sequence number. By marking the low bit (byte) of the sequence handle, we can notify the wait routine that we either succeeded passing through the phaser or not. Since this miss-usage is likely to be rare, we won't bother "fixing" the phaser_test() routine though. It can spuriously return TRUE to no great harm, other than inefficiency.

In the above we have deliberately abstracted away the internals to the phaser data structure. The reason for this is that phasers have one other feature that is different to barriers. Threads are allowed to dynamically join and leave a phaser. This means that the total number of threads we are synchronizing can change at any time. This is problematic as we need to atomically handle the cases where threads are simultaneously joining, leaving and entering a phaser. If not done correctly, we could have missed wakeups.

Thus we will place the total field next to the count field in the phaser structure. This will allow the use of atomic instructions that can simultaneously read+modify the two. However, we have a problem. In the barriers implementation, we placed the sequence number seq next to the count for this very same reason. It is impossible to arrange the three variables so that they are all next to each other, and all atomic operations remain naturally aligned.

What saves us is that the Intel platform allows non-aligned atomic instructions. These work as you might expect, but may be slower than their aligned counterparts. By ordering the three fields such that the count is in the middle and with the total and sequence number on either side, we can satisfy all the constraints. For speed, we will place total first because it will be atomically accessed many more times than the sequence number. The resulting structure layout looks like:


typedef struct phaser_t phaser_t;
struct phaser_t
{
	unsigned total;
	unsigned count;
	unsigned seq;
	unsigned refcount;
};

Note how in the above we haven't bothered to use unions to describe the overlapping atomic accesses used in the code. That just complicates the description, without any real gain. We can simply use casts to get the correctly sized operation.

The initialization and destruction look identical to their barrier counterparts:


/* Initialize a phaser with the given number of threads */
void phaser_init(phaser_t *p, unsigned count)
{
	p->seq = 0;
	p->count = 0;
	p->refcount = 1;

	/* Total of waiting threads */
	p->total = count - 1;
}

/* Destroy a phaser when it is unused */
void phaser_destroy(phaser_t *p)
{
	/* Trigger futex wake */
	atomic_add(&p->refcount, -1);

	/* Wait until all references to the barrier are dropped */
	while (1)
	{
		unsigned ref = atomic_read(p->refcount);

		if (!ref) return;

		sys_futex(&p->refcount, FUTEX_WAIT_PRIVATE, ref, NULL, NULL, 0);
	}
}

In the code for the signal, and wait routines we abstracted out the refcounting code. Refcounting is needed, just like in the barriers case because threads leaving the phaser will read the phaser's state. To safely destroy a phaser, we need to wait until all such reads have occurred. Otherwise those threads may attempt to read freed memory, with potentially disastrous results. The refcounting code is simple:


/* Reference counting for phaser object */
static void ref(phaser_t *p)
{
	atomic_add(&p->refcount, 1);
}

static void deref(phaser_t *p)
{
	/* Are we the last to wake up? */
	if (atomic_xadd(&p->refcount, -1) == 1)
	{
		/* Wake destroying thread */
		sys_futex(&p->refcount, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
	}
}

Next we need the routines that atomically add to the count of threads waiting at the phaser, and spin waiting for that phaser to be ready. The two are very similar to their barriers counterparts. The only difference is that we cannot simply read the total number of threads. We need to include accessing that variable within the atomic update of the count of waiting threads:


/* Spin/sleep until the sequence number has changed */
static void spin_threads(phaser_t *p, unsigned seq)
{
	int i;

	seq |= 1;

	/* Spin a bit, waiting for the sequence number to change */
	for (i = 0; i < 1000; i++)
	{
		if ((atomic_read(p->seq) | 1) != seq) break;
	}

	/* Can we proceed? */
	while ((atomic_read(p->seq) | 1) == seq)
	{
		/* Hack - set a flag that says we are sleeping */
		*(volatile char *) &p->seq = 1;

		/* Sleep on it instead */
		sys_futex(&p->seq, FUTEX_WAIT_PRIVATE, seq, NULL, NULL, 0);
	}
}

/* Increment count, and atomically read total */
static void add_count(phaser_t *p, unsigned *total, unsigned *count, unsigned *seq)
{
	unsigned long long ct;

	/* Read sequence number first */
	*seq = atomic_read(p->seq);

	/* Then read total whilst updating the count */
	ct = atomic_xadd((unsigned long long *) &p->total, 1ULL << 32);

	*count = ct >> 32;
	*total = ct;
}

With the above, we can also construct a "sigwait" routine, one that both signals and waits. It's operation is identical to a barrier:


/* Signal and wait at a phaser - acts just like a barrier */
void phaser_sigwait(phaser_t *p)
{
	ref(p);

	while (1)
	{
		unsigned seq, total, count;

		add_count(p, &total, &count, &seq);

		if (count == total)
		{
			wake_threads(p);

			break;
		}

		spin_threads(p, seq);

		/* Can we go? */
		if (count < total) break;
	}

	deref(p);
}

Finally, we need the routines that add and remove threads from a phaser. Adding is simple, we just atomically increment the count. Removing is more complex. We could be the last thread to arrive. If so, we need to wake the other waiting threads. Thus, the code looks similar to the add_count routine:


/* Add ourself to a phaser - needs external synchronization */
void phaser_add(phaser_t *p)
{
	atomic_add(&p->total, 1);
}

/* Remove ourself from a phaser - returns 1 for last thread */
int phaser_remove(phaser_t *p)
{
	unsigned long long ct;
	unsigned count;
	unsigned total;

	ref(p);

	/* Atomically decrease total, whilst reading the count */
	ct = atomic_xadd((unsigned long long *) &p->total, -1);

	count = ct >> 32;
	total = ct;

	if (count == total) wake_threads(p);

	deref(p);

	/* Were we the last? */
	return !total;
}

The result is an efficient phaser implementation. By splitting up the signal and wait, we can speed up code that uses barriers. The extra overhead in doing so is minimal, and we gain a much increased overlap of synchronization and computation. By allowing threads to add and remove themselves from phasers, it is possible to efficiently construct deadlock-free synchronization graphs where threads only wait on those that they need to. You might want to try phasers in your code to see how much they can improve performance...

Comments

sfuerst said...
The wake_threads() function was accidentally left out of the article. This has been fixed.
Bubba said...
How would one implement a Windows version?
Mohammad said...
Ya learn sohmeting new everyday. It's true I guess!

Enter the 10 characters above here


Name
About Us Returns Policy Privacy Policy Send us Feedback
Company Info | Product Index | Category Index | Help | Terms of Use
Copyright © Lockless Inc All Rights Reserved.