如果不能正常显示,请查看原文 , 或返回

Introduction to Lock-Free Algorithms

Lock-Free Algorithms

An Introduction

Concurrency

A property of systems whose global state is composed of the interleaved execution of independent or partially-independent components.

Parallelism

Parallel systems exhibit concurrency but one or more components may execute at the same time.

Concurrent Data Structures

A data structure that is shared by cooperating processes. Concurrent execution of operations may render a data structure in an inconsistent state if special precautions are not taken.

#include 

struct counter {
	int32_t value;
};

int32_t
counter_read(struct counter *counter)
{

	return counter->value;
}

void
counter_increment(struct counter *counter)
{

	counter->value++;
	return;
}

void
counter_decrement(struct counter *counter)
{

	counter->value--;
	return;
}

counter_increment:
	[...]
	movl    (%rax), %ecx
	leal    1(%ecx), %edx
	movl    %edx, (%rax)
	[...]


void
counter_increment(struct counter *counter)
{
	register int32_t ecx, edx;

	ecx = counter->value;
	edx = ecx + 1;
	counter->value = edx;

	return;
}


				counter->value = 0



	Thread A (counter_increment)			Thread B (counter_decrement)

1.	ecx = counter->value;
2.							ecx = counter->value;
3.	edx = ecx + 1;
4.							edx = ecx - 1;
5.	counter->value = edx;
6.							counter->value = edx;
7.	return;
8.							return;


				counter->value = -1

A mutex is an object which implements acquire and relinquish operations such that the execution following an acquire operation and up to the relinquish operation is executed in a mutually exclusive manner relative to the object implementing a mutex. A lock provides some form of exclusion guarantees.


				counter->value = 0



	Thread A (counter_increment)			Thread B (counter_decrement)

 1.	mutex_lock(&counter->mutex);
 2.							mutex_lock(&counter->mutex);
 3.	ecx = counter->value;
 4.	edx = ecx + 1;
 5.	counter->value = edx;
 6.	mutex_unlock(&counter->mutex);
 7.	return;
 8.							ecx = counter->value;
 9.							edx = ecx + 1;
10.							counter->value = edx;
11.							mutex_unlock(&counter->mutex);
12.							return;


				counter->value = 0

Locks are susceptible to a unique set of problems. A great number of these problems stem from the fact that locks are not composable.


  • Deadlock: impossibility of acquisition
  • Convoying: scheduling overhead and queue effects degrade system performance
  • Livelock: active processes fail to progress
  • Starvation: unfair distribution of lock acquisition
  • Inefficiency: pre-emption, granularity

Non-Blocking Data Structures

It is possible to implement concurrent data structures without the use of critical sections built on carefully selected atomic operations. These data structures are called non-blocking data structures.


void
counter_increment(struct counter *counter)
{

	fetch_and_add_32(&counter->value, 1);
	return;
}

Wait-Freedom

Per-operation progress guarantees. Every operation is guaranteed to complete in a finite number of steps.

Lock-Freedom

Per-object progress guarantees. It is guaranteed that some operation will always complete successfully in a finite number of steps.

Obstruction-Freedom

Non-blocking progress guarantee. At any point in time, a single thread is guaranteed to make progress if other threads are suspended. Partially completed operations must be abortable.

WF ⊂ LF ⊂ OF

Atomic Operations

Early systems supported little more than atomic loads and stores. A lot of basic constructs remain to be impractical.

Which atomic operations are worth supporting?

Three prevalent atomic operations aside from atomic loads and stores are:

  • fetch_and_φ
  • compare_and_swap
  • load_linked/store_conditional

/*
 * The following block of code is executed atomically.
 * The argument φ represents some function to be applied
 * to the target location with some optional argument
 * delta.
 */

T
fetch_and_φ(T *target, φ, [T delta])
{
	T previous;

	previous = *target;
	φ(target, [delta]);
	return previous;
}

/*
 * The following block of code is executed atomically.
 */

T
compare_and_swap(T *target, T current, T update)
{
	T previous;

	previous = *target;

	if (*target == current)
		*target = update;

	return previous;
}

/*
 * The following blocks of code are executed atomically.
 * The semantics for how reservations are handled varies
 * wildly across architectures. These can be especially
 * sensitive.
 */

static void *reservations[N];

T
load_linked(T *target)
{

	reservations[j] = target;
	return *target;
}

bool
store_conditional(T *target, T value)
{

	if (reservations[j] != target)
		return false;

	reservations[j] = NULL;
	*target = value;

	return true;
}

Consensus Problem

A set of processes each start with an input value from some domain D and communicate with each other by applying operations on these shared objects. They must eventually agree on a common input value and halt.


A consensus protocol is required to be:

  • valid: common decision value was an input value
  • consistent: never decide on distinct values
  • wait-free: finite number of steps

/*
 * X is some value that was not an input to any
 * process.
 */

T final = X;

T
decide(T input)
{
	T previous;

	previous = compare_and_swap(final, X, input);
	if (previous == X)
		return input;

	return previous;
}
	

/*
 * X is some value that was not an input to any
 * process.
 *
 * φ(X) != X
 */

T vote = X;
T winner[2];

T
decide_0(T input)
{
	T previous;

	winner[0] = input;
	previous = fetch_and_φ(vote);
	if (previous == X)
		return input;

	return winner[1];
}

T
decide_1(T input)
{
	T previous;

	winner[1] = input;
	previous = fetch_and_φ(vote);
	if (previous == X)
		return input;

	return winner[0];
}
	

A hierarchy exists for atomic operations such that an object lower in the hierarchy cannot implement an object higher in the hierarchy in a wait-free manner.

OperationConsensus Number


compare_and_swap, ll/sc
fetch_and_φ2
load/store1

A wait-free implementation of an object with consensus number n can be constructed from any other object with consensus number j where j >= n. The art form comes in constructing a practical implementation.

* Hierarchy is not rigid

Non-Blocking Stack

struct node {
	void *value;
	struct node *next;
};

void
stack_push(struct node **top, struct node *entry, void *value)
{

	entry->value = value;
	entry->next = *top;
	*top = entry;
	return;
}

struct node *
stack_pop(struct node **top)
{
	struct node *r;

	r = *top;
	*top = r->next;
	return r;
}

struct node {
	void *value;
	struct node *next;
};

void
stack_push(struct node **top, struct node *entry, void *value)
{

	entry->value = value;

	do {
		entry->next = ck_pr_load_ptr(top);
	} while (ck_pr_cas_ptr(top, entry->next, entry) == false);

	return;
}

struct node *
stack_pop(struct node **top)
{
	struct node *r, *next;

	do {
		r = ck_pr_load_ptr(top);
		if (r == NULL)
			return NULL;

		next = ck_pr_load_ptr(&r->next);
	} while (ck_pr_cas_ptr(top, r, next) == false);

	return r;
}

Linearizability

A consistency model that requires that operations appear to been completed atomically at some point in between operation invocation and completion. These points are called linearization points.

void
stack_push(struct node **top, struct node *entry, void *value)
{

	entry->value = value;

	do {
		entry->next = ck_pr_load_ptr(top);
	} while (ck_pr_cas_ptr(top, entry->next, entry) == false); <=====================

	return;
}

struct node *
stack_pop(struct node **top)
{
	struct node *r, *next;

	do {
		r = ck_pr_load_ptr(top);
		if (r == NULL)
			return NULL;

		next = ck_pr_load_ptr(&r->next);
	} while (ck_pr_cas_ptr(top, r, next) == false);            <=====================

	return r;
}

ABA Problem

A false positive execution of a CAS-based speculation on a shared location ...

struct node *
stack_pop(struct node **top)
{
	struct node *r, *next;

	do {
		r = ck_pr_load_ptr(top);
		if (r == NULL)
			return NULL;

		next = ck_pr_load_ptr(&r->next);


	} while (ck_pr_cas_ptr(top, r, next) == false);

	return r;
}

Counters are a popular mechanism for ABA-prevention by versioning objects that are the target of CAS-based speculation. Unfortunately, counters can be insufficiently wide for platforms that lack a CAS2 operation.

struct ck_stack_entry {
	struct ck_stack_entry *next;
};

struct ck_stack {
	struct ck_stack_entry *head;
	char *generation CK_CC_PACKED;
};

struct ck_stack_entry *
ck_stack_pop_mpmc(struct ck_stack *target)
{
	struct ck_stack original, update;

	original.generation = ck_pr_load_ptr(&target->generation);
	original.head = ck_pr_load_ptr(&target->head);
	if (original.head == NULL)
		return (NULL);

	update.generation = original.generation + 1;
	update.head = original.head->next;

	while (ck_pr_cas_ptr_2_value(target, &original, &update, &original) == false) {
		if (original.head == NULL)
			return (NULL);

		ck_pr_store_ptr(&update.generation, original.generation + 1);
		ck_pr_store_ptr(&update.head, original.head->next);
	}

	return (original.head);
}

Best-case latency of pop implementations as measured by serialized rdtsc.


Processorcmpxchgcmpxchg16b
Intel Core i3 540 3.07GHz11ns17ns
Intel Xeon E5530 2.40GHz12ns20ns

Memory Ordering

Defines the semantics of memory operation visibility and ordering.

		rsi = 0xbfbfbfbfbfbfbf00          rdi = 0x0101010101010100

		   Processor 1                          Processor 2

		1. movq $40, (%rsi)                  1. movq $40, (%rdi)
		2. movq $41, 64(%rsi)                2. movq $41, 64(%rdi)
		3. movq (%rdi), %rax                 3. movq (%rsi), %rcx
		4. movq 64(%rdi), %rbx               4. movq 64(%rsi), %rdx

		----------------------------------------------------------

		rax = 40                             rcx = 40
		rbx = 41                             rdx = 41


Sequential consistency guarantees a total ordering to all operations.

Modern processors with instruction-level parallelism may batch stores in an intermediate store buffer. Loads may also be served from the store buffer.


		rsi = 0xbfbfbfbfbfbfbf00          rdi = 0x0101010101010100

		   Processor 1                          Processor 2

		5. movq $40, (%rsi)                  1. movq $40, (%rdi)
		6. movq $41, 64(%rsi)                2. movq $41, 64(%rdi)
		3. movq (%rdi), %rax                 3. movq (%rsi), %rcx
		4. movq 64(%rdi), %rbx               4. movq 64(%rsi), %rdx

		----------------------------------------------------------

		rax = 40                             rcx = 0 
		rbx = 41                             rdx = 0

		rsi = 0xbfbfbfbfbfbfbf00          rdi = 0x0101010101010100

		   Processor 1                          Processor 2

		1. movq $40, (%rsi)                  5. movq $40, (%rdi)
		2. movq $41, 64(%rsi)                6. movq $41, 64(%rdi)
		3. movq (%rdi), %rax                 3. movq (%rsi), %rcx
		4. movq 64(%rdi), %rbx               4. movq 64(%rsi), %rdx

		----------------------------------------------------------

		rax = 0                              rcx = 40
		rbx = 0                              rdx = 41

		rsi = 0xbfbfbfbfbfbfbf00          rdi = 0x0101010101010100

		   Processor 1                          Processor 2

		5. movq $40, (%rsi)                  5. movq $40, (%rdi)
		6. movq $41, 64(%rsi)                6. movq $41, 64(%rdi)
		3. movq (%rdi), %rax                 3. movq (%rsi), %rcx
		4. movq 64(%rdi), %rbx               4. movq 64(%rsi), %rdx

		----------------------------------------------------------

		rax = 0                              rcx = 0
		rbx = 0                              rdx = 0

Total Store Ordering

  • Loads are not reordered with respect to each other
  • Stores are not reordered with respect to each other
  • Stores are not reordered with respect to prior loads
  • Loads can be reordered with respect to prior stores
  • Stores to the same location have a global ordering
  • Atomics are serializing

		rsi = 0xbfbfbfbfbfbfbf00          rdi = 0x0101010101010100

		   Processor 1                          Processor 2

		1. movq $40, (%rsi)                  1. movq $40, (%rdi)
		2. movq $41, 64(%rsi)                2. movq $41, 64(%rdi)
		3. mfence                            3. mfence
		4. movq (%rdi), %rax                 4. movq (%rsi), %rcx
		5. movq 64(%rdi), %rbx               5. movq 64(%rsi), %rdx

		----------------------------------------------------------

		rax = 40                             rcx = 40
		rbx = 41                             rdx = 41

		rsi = 0xbfbfbfbfbfbfbf00          rdi = 0x0101010101010100

		   Processor 1                          Processor 2

		1. movq $40, (%rsi)                  1. movq $40, (%rdi)
		2. movq $41, 64(%rsi)                2. movq $41, 64(%rdi)
		3. sfence                            3. sfence
		4. lfence                            4. lfence
		5. movq (%rdi), %rax                 5. movq (%rsi), %rcx
		6. movq 64(%rdi), %rbx               6. movq 64(%rsi), %rdx

		----------------------------------------------------------

		rax = 40                             rcx = 40
		rbx = 41                             rdx = 41

Partial Store Ordering

  • Loads are not reordered with respect to each other
  • Stores can be reordered with respect to each other
  • Loads can be reordered with respect to stores
  • Stores to the same location have a global ordering
  • Atomics can be reordered with respect to stores

		rsi = 0xbfbfbfbfbfbfbf00          rdi = 0x0101010101010100

		   Processor 1                          Processor 2

		1. movq $40, (%rsi)                  1. movq $40, (%rdi)
		2. sfence                            2. sfence
		3. movq $41, 64(%rsi)                3. movq $41, 64(%rdi)
		4. mfence                            4. mfence
		5. movq (%rdi), %rax                 5. movq (%rsi), %rcx
		6. movq 64(%rdi), %rbx               6. movq 64(%rsi), %rdx

		----------------------------------------------------------

		rax = 40                             rcx = 40
		rbx = 41                             rdx = 41

Relaxed Store Ordering

  • Loads are reordered with respect to each other
  • Loads can be reordered with respect to stores
  • Stores can be reordered with respect to each other
  • Stores to the same location have a global ordering
  • Atomics can be reordered with respect to stores
  • Atomics can be reordered with respect to loads

Safe Memory Reclamation

Allows for the eventual and safe destruction of concurrently shared objects. Implementations presented are not suitable for production-use unless noted otherwise.

This is really an issue for unmanaged languages such as C or C++. GCs solve this problem for you.

Reference Counting

Reference counting is vulnerable to cycles and can become a scalability bottleneck for short-lived references

Traditional reference counting schemes do not work for most non-blocking data structures.

void
stack_push(struct node **top, struct node *entry, void *value)
{

	entry->value = value;
	entry->next = *top;
	*top = entry;
	return;
}

struct node *
stack_pop(struct node **top)
{
	struct node *r;

	r = *top;
	*top = r->next;
	return r;
}

struct node *
stack_peek(struct node *top)
{

	return top;
}

static mutex_t stack_mutex;

struct node {
	void *value;
	unsigned int ref;
	struct node *next;
};

void
stack_push(struct node **top, struct node *entry, void *value)
{

	entry->ref = 1;
	entry->value = value;

	mutex_lock(&stack_mutex);
	entry->next = *top;
	*top = entry;
	mutex_unlock(&stack_mutex);
	return;
}

struct node *
stack_pop(struct node **top)
{
	struct node *r;

	mutex_lock(&stack_mutex);
	r = *top;
	*top = r->next;
	mutex_unlock(&stack_mutex);
	return r;
}

struct node *
stack_peek(struct node *top)
{
	struct node *r;

	mutex_lock(&stack_mutex);
	r = *top;
	mutex_unlock(&stack_mutex);
	return r;
}

struct node *
stack_pop(struct node **top)
{
	struct node *r;

	mutex_lock(&stack_mutex);
	r = *top;
	*top = r->next;
	mutex_unlock(&stack_mutex);
	return r;
}

struct node *
stack_peek(struct node *top)
{
	struct node *r;

	mutex_lock(&stack_mutex);
	r = *top;
	ck_pr_inc_uint(&r->ref);
	mutex_unlock(&stack_mutex);
	return r;
}

void
node_free(struct node *node)
{
	bool r;

	ck_pr_dec_uint_zero(&node->ref, &r);
	if (r == true)
		free(node);

	return;
}

void
stack_push(struct node **top, struct node *entry, void *value)
{

	entry->value = value;

	do {
		entry->next = ck_pr_load_ptr(top);
	} while (ck_pr_cas_ptr(top, entry->next, entry) == false);

	return;
}

struct node *
stack_pop(struct node **top)
{
	struct node *r, *next;

	do {
		r = ck_pr_load_ptr(top);
		if (r == NULL)
			return NULL;

		next = ck_pr_load_ptr(&r->next);
	} while (ck_pr_cas_ptr(top, r, next) == false);

	return r;
}

void
stack_push(struct node **top, struct node *entry, void *value)
{

	entry->ref = 1;
	entry->value = value;

	do {
		entry->next = ck_pr_load_ptr(top);
	} while (ck_pr_cas_ptr(top, entry->next, entry) == false);

	return;
}

struct node *
stack_pop(struct node **top)
{
	struct node *r, *next;

	do {
		r = ck_pr_load_ptr(top);
		if (r == NULL)
			return NULL;

		next = ck_pr_load_ptr(&r->next);
	} while (ck_pr_cas_ptr(top, r, next) == false);

	ck_pr_dec_uint(&r->ref);
	return r;
}

In-band reference counting is insufficient.

void
stack_push(struct node **top, struct node *entry, void *value)
{

	entry->ref = 1;
	entry->value = value;

	do {
		entry->next = ck_pr_load_ptr(top);
	} while (ck_pr_cas_ptr(top, entry->next, entry) == false);

	return;
}

struct node *
stack_pop(struct node **top)
{
	struct node *r, *next;

	do {
		r = ck_pr_load_ptr(top);
		if (r == NULL)
			return NULL;
-------------------------------------------------------------------
		next = ck_pr_load_ptr(&r->next);
	} while (ck_pr_cas_ptr(top, r, next) == false);

	ck_pr_dec_uint(&r->ref);
	return r;
}

In-band reference counting is insufficient.

struct node *
stack_pop(struct node **top)
{
	struct node *r, *next;

	for (;;) {
		r = ck_pr_load_ptr(top);
		if (r == NULL)
			return NULL;
-------------------------------------------------------------------
		ck_pr_inc_uint(&r->ref);
		next = ck_pr_load_ptr(&r->next);

		if (ck_pr_cas_ptr(top, r, next) == false) {
			ck_pr_dec_uint(&r->ref);
			continue;
		}

		break;
	}

	ck_pr_dec_uint(&r->ref);
	return r;
}

In-band reference counting is insufficient.

static unsigned int stack_ref;

void
stack_push(struct node **top, struct node *entry, void *value)
{

	entry->value = value;

	do {
		entry->next = ck_pr_load_ptr(top);
	} while (ck_pr_cas_ptr(top, entry->next, entry) == false);

	return;
}

struct node *
stack_pop(struct node **top)
{
	struct node *r, *next;

	ck_pr_inc_uint(&stack_ref);

	do {
		r = ck_pr_load_ptr(top);
		if (r == NULL)
			return NULL;

		next = ck_pr_load_ptr(&r->next);
	} while (ck_pr_cas_ptr(top, r, next) == false);

	return r;
}

struct node *
stack_peek(struct node *top)
{

	ck_pr_inc_uint(&stack_ref);
	return ck_pr_load_ptr(&top);
}

void
node_free(struct node *n)
{
	bool r;

	ck_pr_dec_uint_zero(&stack_ref, &r);
	if (r == true)
		free_all(n);
	else
		defer(n);

	return;
}

Out-of-band reference counting can provide safety.

node_free will only succeed at the time it was called if all threads who have called stack_pop or stack_peek have already executed a corresponding node_free

Proxy Collectors

Amortized reference counting scheme that leverages counting of some parent proxy object.

struct proxy {
	ck_stack retirement;
	unsigned int ref;
	unsigned int ticks;
};

static struct proxy *proxy_current;
static mutex_t proxy_mutex;

static struct proxy *
proxy_new(void)
{
	struct proxy *r;

	r = malloc(sizeof(struct proxy));
	r->ref = 0;
	ck_stack_init(&r->retirement);
	return r;
}

void
proxy_free(struct proxy *proxy, ck_stack_entry_t *entry)
{

	ck_stack_push_mpmc(&proxy->retirement, entry);
	return;
}

#define PROXY_TICKS 1000

struct proxy *
proxy_get(struct proxy *c)
{
	struct proxy *p = NULL;
	struct proxy *r;

	mutex_lock(&proxy_mutex);
	
	if (proxy_current == NULL) {
		proxy_current = proxy_new();
	} else if (++proxy_current->ticks > PROXY_TICKS) {
		p = proxy_current;
		proxy_current = proxy_new();
	}

	if (c != proxy_current)
		proxy_current->ref++;

	r = proxy_current;
	mutex_unlock(&proxy_mutex);

	if (p != NULL)
		proxy_destroy(p);

	return r;
}

static void
proxy_destroy(struct proxy *c)
{
	bool r;
	ck_stack_entry_t *cursor;

	ck_pr_dec_uint_zero(&c->ref, &r)
	if (r == false)
		return;

	CK_STACK_FOREACH(&c->retirement, cursor) {
		free(container_of(cursor, T, stack_entry));
	}

	free(c);
	return;
}

void *
thread(void *s)
{
	ck_stack_entry_t *cursor;
	struct proxy *current = proxy_get(NULL);
	ck_stack_t *stack = s;
	unsigned int i = 0;

	for (;;) {
		cursor = ck_stack_pop_mpmc(stack);
		if (cursor == NULL)
			continue;

		function(cursor);
		proxy_free(current, cursor);

		if (i++ & 1023)
			current = proxy_get(current);
	}

	return;
}	

Quiescent State-Based Reclamation

Deferred reclamation triggered by the transition across one or more quiescent states (where no shared references are held) by all threads.

void *
thread(void *s)
{
	ck_stack_entry_t *cursor;
	ck_stack_t *stack = s;

	for (;;) {
------------------------------------------------------------
		cursor = ck_stack_pop_mpmc(stack);
		if (cursor == NULL)
			continue;

		function(cursor);
		qsbr_free(cursor);
	}

	return;
}	

void *
thread(void *s)
{
	ck_stack_entry_t *cursor;
	ck_stack_t *stack = s;

	qsbr_init();

	for (;;) {
		qsbr_tick(); <==========================================

		cursor = ck_stack_pop_mpmc(stack);
		if (cursor == NULL)
			continue;

		function(cursor);
		qsbr_free(cursor);
	}

	return;
}	

struct qsbr_generation {
	unsigned int snapshot[NR];
	unsigned int n_entries;
	unsigned int counter;
	ck_stack_t retirement;
	struct qbsr_generation *next;
};
static struct qsbr_generation *states[NR];

void
qsbr_init(void)
{

	states[MYTHREAD] = calloc(1, sizeof(struct qsbr_generation));
	return;
}

void
qsbr_tick(void)
{

	ck_pr_store_uint(&states[MYTHREAD]->counter, states[MYTHREAD]->counter + 1);
	return;
}

static void
qsbr_snapshot(unsigned int *snapshot)
{
	size_t j;

	for (j = 0; j < NR; j++)
		snapshot[j] = ck_pr_load_uint(&states[j]->counter);

	return;	
}

void
qsbr_free(ck_stack_entry_t *entry)
{
	struct qsbr_generation *g;

	if (states[MYTHREAD] == NULL)
		states[MYTHREAD] = calloc(1, sizeof(struct qsbr_generation));

	g = states[MYTHREAD];
	ck_stack_push_spnc(&g->retirement, entry);
	g->n_entries++;

	if (g->n_entries > QSBR_LIMIT) {
		qsbr_snapshot(&g->snapshot);
		qsbr_purge(g);
		states[MYTHREAD] = calloc(1, sizeof(struct qsbr_generation));
		states[MYTHREAD]->next = g;
	}

	return;
}

void
qsbr_purge(struct qsbr_generation *generation)
{
	struct qsbr_generation *g, *n;

	g = generation->next;
	if (g == NULL)
		return;

	if (qsbr_quiesced(g) != NR)
		return;

	generation->next = NULL;

	do {
		ck_stack_entry_t *cursor;

		CK_STACK_FOREACH(&g->retirement, cursor)
			free(container_of(cursor, T, stack_entry));

		n = g->next;
		free(g);	
	} while (g = n);

	return;
}

static unsigned int
qsbr_quiesced(struct qsbr_generation *g)
{
	unsigned int n = 0;
	size_t j;

	for (j = 0; j < NR; j++)
		n += ck_pr_load_uint(&states[j]->counter) != g->snapshot[j];

	return n;
}

Epoch-Based Reclamation

Deferred reclamation triggered by the transition of a globally observed epoch counter. Requires the use of read-side protected sections.

static ck_epoch_t global_epoch;

void
node_free(ck_stack_entry_t *entry)
{

	free(container_of(cursor, T, stack_entry));
	return;
}

void *
thread(void *s)
{
	ck_stack_entry_t *cursor;
	ck_stack_t *stack = s;
	ck_epoch_record_t record;

	ck_epoch_register(&global_epoch, &record);

	for (;;) {
		ck_epoch_write_begin(&record);

		cursor = ck_stack_pop_mpmc(stack);
		if (cursor == NULL) {
			ck_epoch_write_end(&record);
			continue;
		}

		function(cursor);

		ck_epoch_write_end(&record);
		ck_epoch_free(&record, cursor, node_free); 
	}

	return;
}	


Thread 1 is only entering read sections. Thread 2 is only entering write sections.

void
ck_epoch_write_begin(struct ck_epoch_record *record)
{
        struct ck_epoch *global = record->global;

        ck_pr_store_uint(&record->active, record->active + 1);

        /*
         * In the case of recursive write sections, avoid ticking
         * over global epoch.
         */
        if (record->active > 1)
                return;

        for (;;) {
                if (ck_epoch_reclaim(record) == true)
                        break;

                if (++record->delta >= global->threshold) {
                        record->delta = 0;
                        ck_epoch_tick(global, record);
                        continue;
                }

                break;
        }

        return;
}

CK_CC_INLINE static void
ck_epoch_write_end(ck_epoch_record_t *record)
{

        ck_pr_fence_store();
        ck_pr_store_uint(&record->active, record->active - 1);
        return;
}

void
ck_epoch_tick(struct ck_epoch *global, struct ck_epoch_record *record)
{
        struct ck_epoch_record *c_record;
        ck_stack_entry_t *cursor;
        unsigned int g_epoch = ck_pr_load_uint(&global->epoch);

        g_epoch &= CK_EPOCH_LENGTH - 1;
        CK_STACK_FOREACH(&global->records, cursor) {
                c_record = ck_epoch_record_container(cursor);
                if (ck_pr_load_uint(&c_record->status) == CK_EPOCH_FREE ||
                    c_record == record)
                        continue;

                if (ck_pr_load_uint(&c_record->active) != 0 &&
                    ck_pr_load_uint(&c_record->epoch) != g_epoch)
                        return;
        }

        ck_pr_inc_uint(&global->epoch);
        return;
}

bool
ck_epoch_reclaim(struct ck_epoch_record *record)
{
        struct ck_epoch *global = record->global;
        unsigned int g_epoch = ck_pr_load_uint(&global->epoch);
        unsigned int epoch = record->epoch;
        ck_stack_entry_t *next, *cursor;

        g_epoch &= CK_EPOCH_LENGTH - 1;
        if (epoch == g_epoch)
                return false;

        /*
         * This means all threads with a potential reference to a
         * hazard pointer will have a view as new as or newer than
         * the calling thread. No active reference should exist to
         * any object in the record's pending list.
         */
        CK_STACK_FOREACH_SAFE(&record->pending[g_epoch], cursor, next) {
                struct ck_epoch_entry *entry = ck_epoch_entry_container(cursor);
                entry->destroy(entry);
                record->n_pending--;
                record->n_reclamations++;
        }

        ck_stack_init(&record->pending[g_epoch]);
        record->epoch = g_epoch;
        record->delta = 0;
        return true;
}

void
ck_epoch_free(struct ck_epoch_record *record,
              ck_epoch_entry_t *entry,
              ck_epoch_destructor_t destroy)
{
        unsigned int epoch = ck_pr_load_uint(&record->epoch);
        struct ck_epoch *global = record->global;

        entry->destroy = destroy;
        ck_stack_push_spnc(&record->pending[epoch], &entry->stack_entry);
        record->n_pending += 1;

        if (record->n_pending > record->n_peak)
                record->n_peak = record->n_pending;

        if (record->n_pending >= global->threshold && ck_epoch_reclaim(record) == false)
                ck_epoch_tick(global, record);

        return;
}

Hazard Pointers

A wait-free safe memory reclamation mechanism that implements tracing-like semantics. Requires modification of non-blocking algorithms.

static ck_hp_t stack_hp;
static ck_stack_t stack;

void
thread(void *unused)
{
	ck_hp_record_t record;
	void *pointers;

	ck_hp_register(&stack_hp, &record, &pointers);

	for (;;) {
		ck_hp_stack_push_mpmc(&stack, new_entry());
		s = ck_hp_stack_pop_mpmc(&record, &stack);
		e = stack_container(s);
		ck_hp_free(&record, &e->hazard, e, s);
	}

	return;
}

CK_CC_INLINE static void
ck_hp_stack_push_mpmc(struct ck_stack *target, struct ck_stack_entry *entry)
{

        ck_stack_push_upmc(target, entry);
        return;
}

CK_CC_INLINE static void *
ck_hp_stack_pop_mpmc(ck_hp_record_t *record, struct ck_stack *target)
{
        struct ck_stack_entry *entry, *update;

	for (;;) {
		do {
			entry = ck_pr_load_ptr(&target->head);
			if (entry == NULL)
				return NULL;

			ck_hp_set(record, 0, entry);
			ck_pr_fence_memory();
		} while (entry != ck_pr_load_ptr(&target->head));

		if (ck_pr_cas_ptr(&target->head, entry, entry->next, &entry) == true)
			break;
	}

        return entry;
}

void
ck_hp_free(struct ck_hp_record *thread,
           struct ck_hp_hazard *hazard,
           void *data,
           void *pointer)
{
        struct ck_hp *global;

        global = ck_pr_load_ptr(&thread->global);
        ck_pr_store_ptr(&hazard->data, data);
        ck_pr_store_ptr(&hazard->pointer, pointer);
        ck_stack_push_spnc(&thread->pending, &hazard->pending_entry);

        thread->n_pending += 1;
        if (thread->n_pending > thread->n_peak)
                thread->n_peak = thread->n_pending;

        if (thread->n_pending >= global->threshold)
                ck_hp_reclaim(thread);

        return;
}

void
ck_hp_reclaim(struct ck_hp_record *thread)
{
        struct ck_hp *global = thread->global;
        ck_stack_entry_t *previous, *entry, *next;
        struct ck_hp_hazard *hazard;
        void *match;

        previous = NULL;
        CK_STACK_FOREACH_SAFE(&thread->pending, entry, next) {
                hazard = ck_hp_hazard_container(entry);

                if (ck_hp_member_scan(global, global->degree, hazard->pointer) == true) {
                        previous = entry;
                        continue;
                }

                thread->n_pending -= 1;

                /* Remove from the pending stack. */
                if (previous)
                        CK_STACK_NEXT(previous) = CK_STACK_NEXT(entry);
                else
                        CK_STACK_FIRST(&thread->pending) = CK_STACK_NEXT(entry);

                /* The entry is now safe to destroy. */
                global->destroy(hazard->data);
                thread->n_reclamations++;
        }

        return;
}

Process 1                                                         Process 2

[...]

do {
	entry = ck_pr_load_ptr(&target->head);
	if (entry == NULL)
		return NULL;                                      [...]
================================================================= ck_pr_cas_ptr(...) -> free(...)
	ck_hp_set(record, 0, entry);                              [...]
	ck_pr_fence_memory();
} while (entry != ck_pr_load_ptr(&target->head));

ck_pr_cas_ptr(...)

[...]

Process 1                                                         Process 2

[...]

do {
	entry = ck_pr_load_ptr(&target->head);
	if (entry == NULL)
		return NULL;
	ck_hp_set(record, 0, entry);
	ck_pr_fence_memory();
} while (entry != ck_pr_load_ptr(&target->head));
                                                                  [...]
================================================================= ck_pr_cas_ptr(...) -> free(...)
                                                                  [...]
ck_pr_cas_ptr(...)

[...]

Acknowledgements

Thanks to the following individuals for providing feedback for this deck and/or helping organize the event.

Abel Mathew, Brian Feldman, Brian O'Kelley, David Crawford, Devon H. O'Dell, Igor Shindel, Jessica Edwards, Joseph Huttner, Paul Khuong, Wez Furlong, William Lee Irwin III

References

Books

Libraries

Papers

Websites

THE END

Samy Bahra / appnexus.com

返回