深入理解无锁队列与C++原子操作

843 words

深入理解无锁队列与C++原子操作

引言

在多线程编程中,传统的锁机制(如互斥锁)虽然能保证线程安全,但存在性能瓶颈和死锁风险。无锁(Lock-Free)数据结构通过原子操作实现线程安全,能显著提升高并发场景下的性能。本文将详解如何使用C++原子变量实现无锁队列。


原子操作基础

什么是原子操作?

原子操作是不可分割的最小操作单元,要么完全执行,要么完全不执行。C++11通过<atomic>头文件提供原子类型:

1
2
3
#include <atomic>

std::atomic<int> counter(0);

内存顺序(Memory Order)

指定原子操作的内存可见性顺序,常用选项:

  • memory_order_relaxed:无顺序保证
  • memory_order_acquire:加载操作,保证之后的内存访问不会重排到前面
  • memory_order_release:存储操作,保证之前的内存访问不会重排到后面
  • memory_order_seq_cst(默认):严格顺序一致性
1
2
counter.store(42, std::memory_order_release);
int value = counter.load(std::memory_order_acquire);

无锁队列实现

环形缓冲区队列(单生产者/单消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template<typename T, size_t Size>
class LockFreeQueue {
std::array<T, Size> buffer;
std::atomic<size_t> head{0}, tail{0};

public:
bool push(const T& item) {
size_t curr_tail = tail.load(std::memory_order_relaxed);
size_t next_tail = (curr_tail + 1) % Size;

if (next_tail == head.load(std::memory_order_acquire)) {
return false; // 队列满
}

buffer[curr_tail] = item;
tail.store(next_tail, std::memory_order_release);
return true;
}

bool pop(T& item) {
size_t curr_head = head.load(std::memory_order_relaxed);
if (curr_head == tail.load(std::memory_order_acquire)) {
return false; // 队列空
}

item = buffer[curr_head];
head.store((curr_head + 1) % Size, std::memory_order_release);
return true;
}
};

链表式无锁队列(多生产者/多消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
template<typename T>
class LockFreeLinkedListQueue {
struct Node {
T data;
std::atomic<Node*> next;
Node(const T& data) : data(data), next(nullptr) {}
};

std::atomic<Node*> head;
std::atomic<Node*> tail;

public:
LockFreeLinkedListQueue() {
Node* dummy = new Node(T());
head.store(dummy, std::memory_order_relaxed);
tail.store(dummy, std::memory_order_relaxed);
}

void push(const T& data) {
Node* newNode = new Node(data);
Node* currTail;
while(true) {
currTail = tail.load(std::memory_order_acquire);
Node* next = currTail->next.load(std::memory_order_acquire);

if (currTail == tail.load(std::memory_order_relaxed)) {
if (!next) {
if (currTail->next.compare_exchange_weak(
next, newNode,
std::memory_order_release,
std::memory_order_relaxed)) {
break;
}
} else {
tail.compare_exchange_weak(currTail, next,
std::memory_order_release,
std::memory_order_relaxed));
}
}
}
tail.compare_exchange_weak(currTail, newNode,
std::memory_order_release,
std::memory_order_relaxed));
}

bool pop(T& result) {
Node* currHead;
while(true) {
currHead = head.load(std::memory_order_acquire);
Node* currTail = tail.load(std::memory_order_acquire);
Node* next = currHead->next.load(std::memory_order_acquire);

if (currHead == head.load(std::memory_order_relaxed)) {
if (currHead == currTail) {
if (!next) return false;
tail.compare_exchange_weak(currTail, next,
std::memory_order_release,
std::memory_order_relaxed));
} else {
result = next->data;
if (head.compare_exchange_weak(
currHead, next,
std::memory_order_release,
std::memory_order_relaxed)) {
delete currHead; // 实际生产环境需要安全的内存回收
return true;
}
}
}
}
}
};

关键问题与优化

ABA问题

当指针被释放后重新分配,可能导致CAS误判。解决方案:

  • 使用带标签的指针(指针+计数器)
    1
    2
    3
    4
    5
    struct TaggedPointer {
    Node* ptr;
    uintptr_t tag;
    };
    std::atomic<TaggedPointer> head;

内存回收

使用安全内存回收技术:

  • 风险指针(Hazard Pointers)
  • Epoch-based回收

为什么原子操作比加锁性能更好

主要是因为底层机制差异

锁的实现原理

  • 操作系统介入

    :互斥锁(mutex)需要内核态切换(系统调用),涉及:

    • 用户态 → 内核态上下文切换(约100-200ns)
    • 线程调度队列操作(维护等待线程列表)
    • 缓存一致性协议(MESI)维护锁变量的状态
  • 阻塞行为

    :未获取锁的线程会进入休眠状态,触发:

    • 线程上下文保存(寄存器、栈指针等)
    • 调度器唤醒延迟(通常≥1μs)

原子操作原理

  • 硬件直接支持

    :通过CPU指令实现原子性(如x86的

    1
    LOCK CMPXCHG

    • 单条指令完成读写-比较-修改(无需内核介入)
    • 缓存行锁定(通过MESI协议保证原子性)
Comments