当前位置:网站首页>Implementation of CAS lock-free queue

Implementation of CAS lock-free queue

2022-08-04 06:29:00 KuoGavin



1. 基本原理

源于1994年10Paper presented at the International Conference on Parallel and Distributed in January【无锁队列的实现.pdf】.CAS(Compare And Swap,CAS维基百科)指令.CASThe implementation can refer to the following code:

bool CAS(int* pAddr, int nExpected, int nNew) atomically {
    
	if(*pAddr == nExpected) {
    
		*pAddr = nNew;
		return true;
	} else return false;
}
//CAS返回boolTells whether the atomic swap was successful

CAS 根据字面意思即可理解,It is an atomic operation of exchanging data.对应到 CPU 指令的话就是cmpchg.
The internal implementation of lock-free queues is actually atomic as well,Unpredictable situations that occur with multithreaded calls can be avoided,That is, concurrent synchronization of threads is performed.The main core is the function__sync_bool_compare_and_swap,返回类型是 bool 型,Atomic swap returns successfully true,失败返回 false.



2. 代码实现


2.1 Implement a lock-free queue using a linked list

入队操作

Enqueue(EleType x) {
    
	Node* q = new Node();
	q->x = x;
	q->next = nullptr;
	do {
    
		Node* p = tail;
	} while(!CAS(p->next, nullptr, q));
	CAS(tail, p, q);
}

This implementation will cause if a thread is performing an enqueue operation,在CAS(tail, p, q)执行之前,It also hangs up before replacing the tail node with the newly added enqueue node,Then it will cause other threads to perform enqueuing operationsCAS(p->next, nullptr, q)always return false,i.e. infinite loop here,因为 p->next 已经更改为 q,但是tail并没有更新为 q 仍然是 p .

基于此,The above enqueue operation can be improved,在do while();which finds the actual tail node of the lock-free queue,即p->next==nullptr,再进行 tail 节点的替换,实现代码如下:

Enqueue(EleType x) {
    
	Node* q= new Node();
	q->x = x;
	q->next = nullptr;
	
	Node* p = tail;
	Node* oldP = tail;
	do {
    
		while(p->next != nullptr)
			p = p->next;
	} while(!CAS(p->next, nullptr, q));
	CAS(tail, oldP, q);
}

In this case even the update described above occurs tail The case where the previous thread hangs up,在进入到 do while(); 循环时,p Will be designated as the actual tail node of the lock-free queue,从而CAS(p->next, nullptr, q)返回 true,结束循环,更新 tail 节点值.

出队操作

Dequeue() {
    
	do {
    
		Node* p = head->next;
		if(p == nullptr)
			return EMPTY;
	} while(CAS(head->next, p, p->next));
	return p->val;
}
//这里的headis a dumb node

Implementation of the template lock-free queue class

With the above thought,Then you can implement a template lock-free queue class,代码如下:

// Define a linked list to implement the queue
template <typename ElemType>
struct qnode // 链表节点
{
    
  struct qnode *_next;
  ElemType _data;
};
 
template <typename ElemType>
class Queue
{
    
private:
  struct qnode<ElemType> *volatile _head = NULL;  // 随着pop后指向的位置是不一样的, head不是固定的
  struct qnode<ElemType> *volatile _tail = NULL;
 
public:
  Queue() {
    
    _head = _tail = new qnode<ElemType>;
    _head->_next = NULL;
    _tail->_next = NULL;
    printf("Queue _head:%p\n", _head);
  }
 
  void push_list(const ElemType& e) {
    
    struct qnode<ElemType>* p = new qnode<ElemType>;
    if (!p) return ; //p生成失败
    p->next = NULL;
    p->data = e;
    
    struct qnode<ElemType>* t = _tail;
    struct qnode<ElemType>* old_t = _tail;
    
    do {
    
      // 当非NULLWhen the description is not the tail,So it needs to point to the next node
      while (t->next != NULL) {
      
        t = t->next;
      }
    // 如果t->next为则null换为p
    } while (!__sync_bool_compare_and_swap(&t->next, NULL, p));
    // If the tail is equal to the original tail,则换为p.
    __sync_bool_compare_and_swap(&_tail, old_t, p);
  }
  
  bool pop_list(ElemType& e) {
     //e作为传出参数,Record the element value of the pair header
    struct qnode<ElemType>* p = NULL;
    do {
    
      p = _head;
      if (p->next == NULL) return false;
      // If the head is equal to p,那么就指向p的下一个
    } while (!__sync_bool_compare_and_swap(&_head, p, p->next));
 
    e = p->data; //p为最初的head值,Returns the old head of line value
    delete p;
    p = NULL;
    return true;
  }
};

An implementation of this template class,That is, the head and tail nodes of the queue are recorded,The head node of the linked list does not store data(Also dumb nodes?):

  • push 到队尾的时候,First determine whether the current pointer is the actual tail of the queue,即使用CASto judge whether it is,如果是,则 p->next == nullptr,将 p->next 替换为 q, 同时更新 tail The value of the tail node;如果不是,则在 do while(); It searches for the actual queue tail node;
  • pop When heading out,同样也是在 do while(); Check whether the queue is not empty during the loop,If not empty, the previous node value will be returned,并将其删除,更新 head 头节点的值;

2.2 Implement a circular lock-free queue using an array

此外,It can also be implemented using an array,Because the ring array after the memory application,Memory requests and releases are no longer involved:

  • Queues are implemented in the form of circular arrays;
  • The value of the elements of the queue,Initially there are three possible values.HEAD、TAIL、EMPTY;
  • All elements of the array are initialized with EMPTY.There are two adjacent elements initialized to HEAD 与 TAIL,represents an empty queue;
  • 入队操作.假设数据 x 要入队列,定位 TAIL 的位置,使用 double-CAS 方法把 (TAIL, EMPTY) 更新成 (x, TAIL).需要注意,如果找不到 (TAIL, EMPTY),则说明队列满了.
  • 出队操作.定位 HEAD 的位置,把 (HEAD, x) 更新成 (EMPTY, HEAD),并把 x 返回.同样需要注意,如果 x 是 TAIL,则说明队列为空.


3. ABA 问题及解决

简单的说就是线程AModify the current value to 10,此时线程B将值改为11,And then another threadCChange the value to 10,This is the case for threadsAIn other words, the acquired memory value and the current value are unchanged,所以可以更新,But it has actually changed,So it's not logical.

注意到CASThe comparison is the value obtained by taking the contents of the pointer,那么,Suppose a thread is ready to dequeue an operation,First declare a pointerp指针head结点,Next upCAS操作,CAS(head,p,p->next).假定在执行CAS操作之前,A thread has enqueued,此时,head!=p,正常情形CAS(head,p,p->next)should return as false.但是,在CAS(head,p,p->next)之前,Another thread has been enqueued,And the memory occupied by the node that joins the queue is exactly at the beginningp所指向的内存,Then just after a series of dequeuing operations,Make the current head pointer just point to the node that has just been enqueued,最后,才开始,进行CAS操作.We'll find that it should have returned as false的CAS操作,返回了true!(CASWhat is compared is the value stored at the memory address,==).

解决ABA问题,The control information of the version number can be added,Java中有AtomicStampedReferenceClasses can add versions to differentiate when comparing memory values.



4. 参考资料

  1. 基于CASImplemented lock-free queue(多生产者多消费者)

  2. CASThe principle and implementation of lock-free queue(附代码)

原网站

版权声明
本文为[KuoGavin]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/216/202208040526104107.html