多线程编程最难的从来不是创建线程——std::thread t(foo) 谁都会写。真正让人半夜抓头发的是:两个线程同时碰到同一块数据时,怎么让它们不打架。

这篇文章把 C++ 标准库里所有的跨线程同步工具过一遍,从最基础的 mutex 到 C++20 的 semaphore、latch、barrier,再到 Qt 框架的特有方案。每种方法都配有完整的可编译代码示例和"什么时候该用、什么时候不该用"的判断准则。


1. std::mutex + RAII 锁守卫 —— 互斥访问的基石

1.1 三种守卫

特点 使用场景
std::lock_guard 最轻量,构造时 lock,析构时 unlock,不可手动 unlock 简单临界区,scope 结束自动释放
std::unique_lock 可手动 lock/unlock,可 defer(延迟锁),可转让所有权 需要配合 condition_variable 或手动控制锁范围
std::scoped_lock C++17,可同时锁多个 mutex,避免死锁 一次需要锁多个资源

1.2 代码示例

#include <mutex>
#include <thread>
#include <vector>
#include <iostream>

class ThreadSafeCounter {
public:
    void increment() {
        std::lock_guard<std::mutex> lock(_mtx);  // 构造时加锁
        ++_value;                                  // 临界区
    }                                              // 析构时解锁

    int get() const {
        std::lock_guard<std::mutex> lock(_mtx);
        return _value;
    }

private:
    mutable std::mutex _mtx;  // mutable 允许在 const 方法中加锁
    int _value = 0;
};

int main() {
    ThreadSafeCounter counter;
    std::vector<std::thread> threads;

    for (int i = 0; i < 8; ++i) {
        threads.emplace_back([&counter] {
            for (int j = 0; j < 100000; ++j) {
                counter.increment();
            }
        });
    }
    for (auto& t : threads) t.join();
    std::cout << counter.get() << std::endl;  // 800000
}

1.3 同时锁多个 mutex(避免死锁)

std::mutex mtxA, mtxB;

// ❌ 错误:可能导致死锁
void badTransfer(int amount) {
    std::lock_guard lockA(mtxA);
    std::lock_guard lockB(mtxB);  // 如果另一个线程先锁 mtxB 再锁 mtxA → 死锁
    // ...
}

// ✅ 正确:std::scoped_lock (C++17) 或 std::lock + std::lock_guard
void safeTransfer(int amount) {
    std::scoped_lock lock(mtxA, mtxB);  // 原子性地同时锁两个,内部用 std::lock
    // ...
}

1.4 常见坑

  • 忘记 mutable:const 方法里没法对普通 mutex 加锁,编译报错
  • 锁粒度太大:在锁里做 IO 或耗时计算,其他线程全卡住
  • 异常安全:如果用 unlock() 而不是 RAII,抛异常后锁永远不释放

2. std::shared_mutex —— 读写锁

读操作多、写操作少的场景——比如配置表、缓存。shared_mutex 允许多个读者同时访问,写者独占。

读者:shared_lock → 多个读者可以同时持有
写者:unique_lock  → 写者独占,等所有读者释放
#include <shared_mutex>
#include <unordered_map>
#include <string>

class ConfigCache {
public:
    std::string get(const std::string& key) const {
        std::shared_lock lock(_mtx);  // C++14,多个线程可并发
        auto it = _data.find(key);
        return it != _data.end() ? it->second : "";
    }

    void set(const std::string& key, std::string value) {
        std::unique_lock lock(_mtx);  // 独占
        _data[key] = std::move(value);
    }

private:
    mutable std::shared_mutex _mtx;  // C++17
    std::unordered_map<std::string, std::string> _data;
};

什么时候用: 读频率远大于写(如 10:1 以上),临界区操作短。写频繁或临界区长时不适用。


3. std::atomic<T> —— 无锁同步

atomic 利用 CPU 原子指令(如 x86 的 LOCK CMPXCHG),无需互斥锁。核心优势:不会阻塞,没有上下文切换开销

3.1 内存序

memory_order_relaxed      ← 最宽松,仅保证原子性
memory_order_acquire      ← 读屏障,后续操作不能重排到此之前
memory_order_release      ← 写屏障,之前操作不能重排到此之后
memory_order_acq_rel      ← 读+写屏障
memory_order_seq_cst      ← 最严格,全局顺序一致性(默认值)

实战建议: 默认用 seq_cst;确定性能瓶颈再考虑放松到 acquire/release;只有计数器用 relaxed

3.2 常用模式

// 模式 1:无锁计数器
std::atomic<int> counter{0};
void count() {
    for (int i = 0; i < 100000; ++i)
        counter.fetch_add(1, std::memory_order_relaxed);
}

// 模式 2:自旋锁
class SpinLock {
    std::atomic_flag _flag = ATOMIC_FLAG_INIT;
public:
    void lock()   { while (_flag.test_and_set(std::memory_order_acquire)); }
    void unlock() { _flag.clear(std::memory_order_release); }
};

// 模式 3:CAS 循环(Compare-And-Swap)
void atomicMax(std::atomic<int>& target, int value) {
    int old = target.load();
    while (value > old && !target.compare_exchange_weak(old, value)) {
        // old 会被更新为当前值,继续重试
    }
}

// 模式 4:发布-订阅(Release-Acquire)
std::atomic<bool> ready{false};
int data = 0;

void publisher() {
    data = 42;
    ready.store(true, std::memory_order_release);  // data=42 对所有 acquire 可见
}

void subscriber() {
    while (!ready.load(std::memory_order_acquire));
    std::cout << data << std::endl;  // 一定输出 42
}

3.3 ABA 问题

线程 1:读到 A,准备 CAS(A→C)
线程 2:CAS(A→B),然后 CAS(B→A)
线程 1:CAS 成功!但中间 A 已经不是原来的 A 了

解法:给值带个版本号,或使用 std::atomic<std::shared_ptr<T>> (C++20)。


4. std::condition_variable —— 等待与通知

mutex 解决"互斥”,condition_variable 解决"等待某个条件成立”。

template<typename T>
class ThreadSafeQueue {
public:
    void push(T value) {
        {
            std::lock_guard lock(_mtx);
            _queue.push(std::move(value));
        }
        _cv.notify_one();  // 唤醒一个等待的消费者
    }

    T pop() {
        std::unique_lock lock(_mtx);
        _cv.wait(lock, [this] { return !_queue.empty(); });  // 带 predicate 防虚假唤醒
        T value = std::move(_queue.front());
        _queue.pop();
        return value;
    }

    std::optional<T> tryPop(std::chrono::milliseconds timeout) {
        std::unique_lock lock(_mtx);
        if (_cv.wait_for(lock, timeout, [this] { return !_queue.empty(); })) {
            T value = std::move(_queue.front());
            _queue.pop();
            return value;
        }
        return std::nullopt;
    }

private:
    std::queue<T> _queue;
    std::mutex _mtx;
    std::condition_variable _cv;
};

关键注意点:

  • 必须用 std::unique_lock(不能是 lock_guard)——wait 内部需要 unlock/lock
  • 必须带 predicate 防止虚假唤醒
  • notify 不需要持锁——在 unlock 之后 notify 通常更高效

5. std::future / std::promise —— 一次性结果传递

// 简单用法:std::async 自动管理
auto future = std::async(std::launch::async, compute, std::vector<int>{1,2,3,4,5});
int result = future.get();  // 等待结果,只可调用一次

// 高级用法:promise + future 手动配对
std::promise<std::string> promise;
std::future<std::string> future = promise.get_future();

std::thread worker([&promise] {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    promise.set_value("计算结果");
    // 或:promise.set_exception(std::make_exception_ptr(std::runtime_error("失败")));
});

std::cout << future.get() << std::endl;  // 阻塞直到有结果
worker.join();

局限: 只能 set/get 一次,不能 cancel,future::get() 只能调用一次。


6. C++20 新增:semaphorelatchbarrier

6.1 std::counting_semaphore — 信号量

控制同时访问资源的线程数上限。

std::counting_semaphore<3> dbSemaphore{3};  // 最多 3 个线程

void databaseQuery(int id) {
    dbSemaphore.acquire();        // 计数器 -1
    // 临界区:最多 3 个线程同时在这里
    dbSemaphore.release();        // 计数器 +1
}

std::binary_semaphore sem{1};  // 等价于 counting_semaphore<1>

6.2 std::latch — 一次性同步点

std::latch loadComplete{3};
for (int i = 0; i < 3; ++i) {
    std::thread([&loadComplete, i] {
        loadFile(i);
        loadComplete.count_down();  // 原子减 1
    }).detach();
}
loadComplete.wait();  // 阻塞直到计数器归零

6.3 std::barrier — 可复用的同步点

auto onPhaseComplete = [&]() noexcept { std::cout << "阶段完成" << std::endl; };
std::barrier syncPoint(4, onPhaseComplete);  // 4 个线程,每阶段结束调用回调

auto worker = [&](int id) {
    for (int phase = 0; phase < 3; ++phase) {
        data[id] = phase * 10 + id;
        syncPoint.arrive_and_wait();  // 等所有线程到达
    }
};

7. Qt 框架的同步方案

7.1 QMutex + QMutexLocker

class QtCounter {
public:
    void increment() {
        QMutexLocker locker(&_mtx);  // RAII,等价于 std::lock_guard
        ++_value;
    }
    int get() const {
        QMutexLocker locker(&_mtx);
        return _value;
    }
private:
    mutable QMutex _mtx;
    int _value = 0;
};

7.2 QReadWriteLock — 读写锁

QReadLocker locker(&_rwLock);   // 读锁
QWriteLocker locker(&_rwLock);  // 写锁

7.3 Qt 信号槽跨线程 — 最优雅的方案

你完全不需要锁——Qt 帮你把数据搬运到目标线程。

Worker* worker = new Worker;
worker->moveToThread(workerThread);

// QueuedConnection → 跨线程队列调用,零锁
QObject::connect(sender, &Sender::startWork,
                 worker, &Worker::doWork,
                 Qt::QueuedConnection);  // 调用自动进入 workerThread

QObject::connect(worker, &Worker::workFinished,
                 receiver, &Receiver::onResult,
                 Qt::QueuedConnection);  // 结果回到主线程
连接方式 行为
Qt::DirectConnection 槽在发射信号的线程执行
Qt::QueuedConnection 槽排入接收者所在线程的事件队列
Qt::AutoConnection 同线程→Direct,跨线程→Queued(默认)

绝对不能跨线程直接操作 QWidget——所有 UI 操作必须在主线程。

7.4 QAtomicInteger — Qt 的无锁原子操作

QAtomicInteger<int> _ref{0};
void ref()  { _ref.fetchAndAddOrdered(1); }
bool deref() { return _ref.fetchAndSubOrdered(1) == 1; }

8. 方法选型速查

场景 推荐方案 理由
简单临界区 std::mutex + lock_guard 最轻量,最清晰
读多写少 std::shared_mutex 读者不互斥
计数器/标志位 std::atomic<T> 无锁,无阻塞
等待条件 std::condition_variable 标准的生产消费模式
一次性结果 std::future / std::promise 语义清晰,自动等待
限流/线程池 std::counting_semaphore (C++20) 信号量标准方案
并行阶段同步 std::barrier (C++20) 可复用同步点
线程生命周期同步 std::latch (C++20) 一次性倒计时
Qt 跨线程 UI Qt::QueuedConnection 零锁,自动线程安全
Qt 读写锁 QReadWriteLock 比 STL 更早支持,久经考验

9. 常见死锁模式与预防

模式 示例 解法
顺序死锁 A锁m1→m2, B锁m2→m1 全局统一锁顺序;std::scoped_lock
递归死锁 同一线程重复 lock 非递归 mutex std::recursive_mutex;重构消除递归
条件变量死锁 先 notify 再 wait,通知丢失 条件检查和 wait 必须在同一个锁内
锁粒度过大 锁内做网络 IO 只在访问共享数据时加锁,IO 在锁外

10. 性能考量

操作 延迟(大约)
std::atomic::fetch_add (uncontended) ~1 ns
std::mutex::lock (uncontended) ~20 ns
std::shared_mutex::lock_shared ~30 ns
std::mutex::lock (contended) ~2-10 µs(含上下文切换)

策略: 减少竞争 > 减少锁数量。数据分片、读写分离、RCU 是常用手段。


总结

三条黄金法则:

  1. 有共享数据就加锁——没有侥幸
  2. 锁的粒度要小——IO 绝对不能在锁里
  3. 想不清楚就用 std::scoped_lock——同时锁多个资源时永远不会死锁

Qt 项目额外记住一条:能走 QueuedConnection 就不走手动锁。