Thread pool с блокирующей очередью¶
Создание потока стоит дорого. На Linux pthread_create выливается в clone(2), выделение стека (обычно 8 MB виртуального адресного пространства), регистрацию TLS, инициализацию struct task_struct в ядре — порядка 50–100 µs на стандартной системе. Для веб-сервера, обслуживающего 10 000 коротких HTTP-запросов в секунду, спавн потока на каждый запрос быстро становится дороже самой обработки.
Thread pool — фиксированное (или ограниченно растущее) число заранее запущенных потоков, разбирающих задачи из общей очереди. Стоимость создания потока амортизируется по всему времени жизни процесса, а контекст-свитчи и cache pressure ограничены количеством воркеров.
Зачем pool, а не spawn-per-task¶
Стоимость spawn-per-task складывается из четырёх компонентов:
| Компонент | Стоимость | Природа |
|---|---|---|
clone(2) + setup |
30–100 µs | syscall + копирование task_struct |
| Стек | 8 MB VM + 4 KB resident | mmap(MAP_STACK), первая page fault коммитит память |
| TLS | несколько µs | копирование tbss, регистрация в линкере |
| TLB / scheduler load | непрямая | больше потоков → больше context switch, больше TLB-miss |
Pool с N воркерами платит это один раз при старте. Дальнейший submit task стоит ровно одну операцию над очередью + потенциальный FUTEX_WAKE, что укладывается в сотни наносекунд.
Дополнительные эффекты пула:
- Backpressure: bounded queue естественно ограничивает скорость прихода задач, защищая систему от OOM.
- Locality: повторное использование тех же потоков сохраняет тёплый L1/L2 cache и не сбрасывает TLB.
- Предсказуемость latency: число потоков ≈ числу CPU, scheduler не overcommit'ит ядра.
Архитектура¶
submit(task)
│
▼
┌─────────────────────────┐
│ blocking queue │
│ ┌───┬───┬───┬───┬───┐ │
│ │ T │ T │ T │ T │ │ │ ← FIFO
│ └───┴───┴───┴───┴───┘ │
│ head tail │
└────────────┬────────────┘
│ pop() (блокирует если пусто)
┌──────────────┬─────────┴──────────┬──────────────┐
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ worker 0 │ │ worker 1 │ │worker N-2│ │worker N-1│
│ │ │ │ . . . │ │ │ │
│ loop: │ │ loop: │ │ loop: │ │ loop: │
│ pop │ │ pop │ │ pop │ │ pop │
│ exec │ │ exec │ │ exec │ │ exec │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Поток управления:
submit(task)захватывает mutex очереди, помещает task в хвост, делаетnotify_one.- Один из спящих воркеров просыпается на cv, проверяет предикат, забирает task из головы, отпускает mutex.
- Воркер выполняет task вне mutex'а — пока он работает, другие воркеры разбирают остаток очереди параллельно.
- По завершении task воркер возвращается к
pop(). shutdownпомечает очередь closed; воркеры дочёрпывают оставшиеся задачи и выходят; destructor join'ит их.
Блокирующая очередь¶
Сердце pool'а — concurrent queue с двумя свойствами: pop блокирует поток до появления элемента, и есть способ корректно завершить работу. Минимальная реализация на std::mutex + std::condition_variable:
#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>
template <typename T>
class BlockingQueue {
std::mutex m;
std::condition_variable cv;
std::queue<T> q;
bool closed = false;
public:
void push(T item) {
{
std::lock_guard<std::mutex> lock(m);
if (closed) return; // молча отбросить, либо throw
q.push(std::move(item));
}
cv.notify_one(); // разбудить ровно одного воркера
}
std::optional<T> pop() {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [&] { return !q.empty() || closed; });
if (q.empty()) return std::nullopt; // closed && empty → "очередь мертва"
T item = std::move(q.front());
q.pop();
return item;
}
void close() {
{
std::lock_guard<std::mutex> lock(m);
closed = true;
}
cv.notify_all(); // разбудить всех — пусть выходят
}
};
Несколько решений в этом коде заслуживают комментария.
notify_one vs notify_all. В push будит ровно один поток: если он один разбирает один элемент, будить остальных бессмысленно — они проснутся, увидят пустую очередь и снова заснут (thundering herd, см. ниже). В close нужен notify_all, потому что все воркеры обязаны проснуться и заметить завершение.
unique_lock в pop. cv.wait атомарно освобождает mutex и засыпает, а при пробуждении захватывает его обратно. lock_guard так не умеет — у него нет операций lock/unlock без RAII.
Предикат в wait. wait(lock, predicate) эквивалентен циклу while(!pred()) cv.wait(lock);. Цикл защищает от spurious wakeups (ядро может разбудить поток без сигнала) и от ситуации, когда между notify и пробуждением другой воркер уже забрал элемент.
std::optional<T> как возвращаемое значение. pop возвращает nullopt только если очередь закрыта и пуста. Это даёт воркеру явный сигнал на выход без exception'ов и без sentinel-задачи в очереди.
Освобождение mutex'а перед notify_one. Скобки { ... } вокруг lock_guard существенны: notify под держимым mutex'ом приводит к лишнему пробуждению-и-сну, потому что разбуженный поток сразу упрётся в захваченный mutex. POSIX и C++ Standard оба разрешают notify без удержания, и это эффективнее.
Состояния очереди:
stateDiagram-v2
[*] --> OPEN_EMPTY
OPEN_EMPTY: OPEN + EMPTY
OPEN_NONEMPTY: OPEN + NON-EMPTY
CLOSED_NONEMPTY: CLOSED + NON-EMPTY
CLOSED_EMPTY: CLOSED + EMPTY
OPEN_EMPTY --> OPEN_NONEMPTY: push
OPEN_NONEMPTY --> OPEN_EMPTY: pop (last)
OPEN_NONEMPTY --> OPEN_NONEMPTY: push / pop
OPEN_EMPTY --> CLOSED_EMPTY: close()
OPEN_NONEMPTY --> CLOSED_NONEMPTY: close()
CLOSED_NONEMPTY --> CLOSED_EMPTY: drain (pop all)
CLOSED_EMPTY --> [*]: pop → nullopt,<br/>воркер выходит
Простейший Thread Pool¶
#include <atomic>
#include <functional>
#include <thread>
#include <vector>
class ThreadPool {
std::vector<std::thread> workers;
BlockingQueue<std::function<void()>> queue;
public:
explicit ThreadPool(size_t n) {
workers.reserve(n);
for (size_t i = 0; i < n; ++i) {
workers.emplace_back([this] {
while (auto task = queue.pop()) {
try {
(*task)();
} catch (...) {
// глотаем — иначе terminate уничтожит весь pool
}
}
});
}
}
template <typename F>
void submit(F&& task) {
queue.push(std::function<void()>(std::forward<F>(task)));
}
~ThreadPool() {
queue.close(); // разбудить всех воркеров
for (auto& w : workers)
if (w.joinable()) w.join(); // дождаться завершения
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
};
Воркер крутит while (auto task = queue.pop()) — pop() возвращает nullopt, когда очередь закрыта и пуста, цикл ломается, поток завершается. Это и есть graceful shutdown: оставшиеся в очереди задачи дочёрпываются, новые push после close молча игнорируются, никаких задач не теряется и никаких deadlock'ов нет.
std::function<void()> стирает тип лямбды или функтора, передаваемого в submit. Это позволяет хранить разнородные задачи в одной очереди. Цена — type erasure через указатель + потенциальная heap-аллокация для больших captures (small-buffer optimization обычно покрывает захват одного-двух указателей).
Рабочий пример
Полная компилируемая реализация thread pool с блокирующей очередью и submit, возвращающим std::future: examples/q05_thread_pool/thread_pool.cpp — собрать и запустить: cd examples && make q05 && ./bin/q05_thread_pool.
try { ... } catch (...) в воркере критичен: исключение, выпавшее из task(), разрушит std::thread, что в деструкторе превратится в std::terminate и уронит весь процесс. Что делать с пойманным исключением — отдельный вопрос; для submit'ов с результатом ответ даёт std::packaged_task.
submit с возвратом результата¶
std::function<void()> не возвращает значения. Чтобы вернуть результат и пробросить исключение в вызывающий поток, оборачиваем callable в std::packaged_task, которая по get_future() отдаёт std::future (см. Future/Promise):
#include <future>
#include <memory>
template <typename F, typename... Args>
auto submit(F&& f, Args&&... args)
-> std::future<std::invoke_result_t<F, Args...>>
{
using R = std::invoke_result_t<F, Args...>;
auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto task = std::make_shared<std::packaged_task<R()>>(std::move(bound));
std::future<R> result = task->get_future();
queue.push([task] { (*task)(); }); // shared_ptr копируется в лямбду
return result;
}
Использование:
ThreadPool pool(4);
auto f1 = pool.submit([](int a, int b) { return a + b; }, 2, 3);
auto f2 = pool.submit([] { throw std::runtime_error("boom"); });
std::cout << f1.get() << "\n"; // 5
try { f2.get(); }
catch (const std::exception& e) { std::cout << e.what() << "\n"; }
Два неочевидных момента в реализации.
Почему shared_ptr<packaged_task>, а не задача целиком в лямбде? packaged_task move-only — её нельзя положить внутрь std::function, потому что std::function требует CopyConstructible Callable (исторический долг C++11; в C++23 появился std::move_only_function, который этого не требует). shared_ptr делает обёртку копируемой, при этом сам packaged_task остаётся внутри одним экземпляром.
Кто держит packaged_task до выполнения? Лямбда захватывает shared_ptr по копии — это последний живой указатель, пока task не выполнена. После (*task)() лямбда уничтожается, shared_ptr отпускает task, future уже получила результат через свой shared state.
Узкие места¶
sub ─▶┐ ┌────────────────────────┐ ┌─▶ worker 0
sub ─▶┤ │ │ ├─▶ worker 1
sub ─▶┼────▶│ глобальный mutex │────▶┼─▶ worker 2
sub ─▶┤ │ + одна cache line │ ├─▶ worker 3
sub ─▶┘ └────────────────────────┘ └─▶ worker N-1
▲
│
узкое место:
все ходят через
эту дверь по одному
Простой pool отлично работает до ~10⁵ задач в секунду. На большем потоке вылезают три проблемы.
Mutex contention. Все submit и все pop крутятся через один std::mutex. На 16 потоках, борющихся за один mutex, доля времени, проведённого в slow-path futex, растёт нелинейно. Профилировщик показывает __lll_lock_wait в топе.
Cache ping-pong. head и tail указатели std::queue (внутри обёрнутого std::deque) лежат рядом в одной cache line. Поток, делающий push, дёргает её на свой L1 (MESI: M state), и каждый поток, делающий pop, инвалидирует свой кэш. На 16 потоках с большим RPS cache line летает между ядрами тысячи раз в секунду — отдельные ассемблерные инструкции lock стоят сотни ns вместо единиц.
Thundering herd. Если бы push делал notify_all вместо notify_one, все N воркеров проснулись бы на каждый task. N−1 из них упёрлись бы в захваченный mutex, заснули обратно — лишних 2N context switch'ей на каждый task. notify_one это убирает, но не решает первые две проблемы.
Что делать:
- Sharded queue: M очередей, submit round-robin'ом или по хэшу caller'а — уменьшает contention в M раз.
- Per-CPU queue: воркер всегда читает из «своей» очереди, cache line почти не мигрирует.
- Lock-free queue: убирает mutex как primitive contention point.
- Work-stealing: combo per-worker queue + кража, когда своя пуста.
Work-stealing¶
Work-stealing — стандарт для CPU-bound workload в современных runtime'ах (Intel TBB, Cilk, Java ForkJoinPool, Tokio, Go runtime). Каждый воркер имеет свою deque (double-ended queue); владелец работает с одним концом, воры — с другим.
worker 0 own deque worker 1 own deque
┌────────────────────────┐ ┌────────────────────────┐
│ T7 T6 T5 T4 T3 T2 │ │ (empty) │
│ ▲ ▲ │ │ │
│ │ │ │ └───────────┬────────────┘
│ │ steal from top │ │ │
│ │ (FIFO, thieves) │ │ │ своя пуста
│ │ │ │ ▼
│ │ push/pop│ │ ┌────────────────────────────────┐
│ │ (LIFO, │ │ │ выбрать случайную жертву, │
│ │ by owner│ │ │ peek верхушку её deque, │
│ │ │ │ │ если непусто — CAS-украсть │
└────────────────────────┘ └────────────────────────────────┘
Ключевые свойства:
- Owner работает LIFO снизу. Свежее задание оказывается рядом с тёплым cache; recursive parallelism (родительская task ждёт результата от child'ов, которые она же положила) распаковывается естественным DFS.
- Воры крадут FIFO сверху. Снизу деки — «горячий» хвост, активно мигрирующий между cache lines владельца; сверху — старые задачи, которые владелец трогать перестал, их можно унести без false sharing.
- Конкурируют только владелец и воры. Между двумя ворами одной очереди — да, но эта конкуренция редкая, и решается классическим Chase-Lev deque (lock-free deque с CAS только на верхнем конце).
- Если своя пуста, владелец сам идёт воровать. Стратегия выбора жертвы — обычно случайная (uniform random); это даёт хорошее распределение нагрузки без глобального координатора.
Сравнение с глобальной FIFO-очередью:
| Свойство | Global FIFO queue | Work-stealing |
|---|---|---|
| Contention point | один mutex | в основном per-worker, редкая на краже |
| Cache locality | плохая (миграция между CPU) | отличная (LIFO в hot cache) |
| Fairness | строгая FIFO | приоритет «своих» задач |
| Latency для new task | низкая (next worker берёт сразу) | может ждать кражи, если воркер занят |
| Подходит для | I/O-bound, mixed | CPU-bound, fork-join parallelism |
Work-stealing проигрывает global queue по latency на единичных задачах в idle-pool'е (вор должен «обнаружить» новую работу через random probing), но выигрывает на throughput при насыщении.
Lock-free queue¶
Альтернатива mutex'у — MPMC (multi-producer, multi-consumer) lock-free queue. Классическая основа — Michael-Scott queue (1996, см. Lock-free структуры) — связный список с CAS на хвосте для enqueue и на голове для dequeue.
// Концептуально, без ABA-защиты:
struct Node { T value; std::atomic<Node*> next; };
std::atomic<Node*> head, tail;
void enqueue(T v) {
Node* n = new Node{std::move(v), nullptr};
Node* old_tail;
do {
old_tail = tail.load();
} while (!tail.compare_exchange_weak(old_tail, n));
old_tail->next.store(n);
}
Реальный production-grade Node обвешан hazard pointers или epoch-based reclamation, чтобы избежать use-after-free при удалении узлов. ABA-проблема (CAS видит то же значение, но память переиспользована) решается tagged pointers или той же epoch reclamation.
Готовые библиотеки:
| Библиотека | Особенности |
|---|---|
moodycamel::ConcurrentQueue |
bounded и unbounded, batched ops, header-only |
boost::lockfree::queue |
strict lock-free, bounded, требует trivially copyable T |
folly::MPMCQueue |
bounded, ticket-based, очень быстрая на высокой нагрузке |
rigtorp::SPSCQueue |
single-producer single-consumer, нулевая контенция |
Tradeoff: lock-free сложнее, агрессивно нагружает coherency protocol (CAS — это атомарная транзакция, требующая monopoly на cache line), и в low-contention режиме часто медленнее обычного mutex'а. Берут lock-free тогда, когда профиль показывает mutex как bottleneck, либо когда нужны wait-free гарантии для realtime.
Sizing thread pool¶
Число воркеров — единственный параметр, влияющий на performance больше всего. Универсального ответа нет, потому что оптимум зависит от природы задач.
CPU-bound. Если task насыщает CPU (вычисления, парсинг, компрессия), оптимум — около числа доступных аппаратных потоков:
Иногда берут N_CPU + 1 — лишний поток покрывает редкие блокировки на page fault. Больше потоков, чем CPU, добавляет context switching overhead без прироста throughput.
Hyperthreading nuance. На SMT процессоре hardware_concurrency() возвращает число логических CPU (обычно 2× физических). Для чистого CPU-bound workload SMT даёт ~1.2–1.4× throughput, не 2× — два логических ядра делят execution units. Для memory-bound workload эффект больше, потому что один поток ждёт RAM, второй использует ALU. Для AVX-heavy кода SMT часто вреден.
I/O-bound. Если task блокируется на syscall (read, write, recv), CPU простаивает. Формула Little's Law:
Task, проводящий 90% времени в read() и 10% в обработке, нуждается в N_cpu × 10 потоках, чтобы насытить CPU. На практике это означает, что для типичного веб-бэкенда с базами данных и внешними API пулы из 50–200 потоков — норма.
Адаптивные пулы. Java ForkJoinPool (на work-stealing) автоматически спавнит compensation threads, когда воркер блокируется в ManagedBlocker. Это даёт лучшее обоих миров: малое базовое число потоков для CPU-bound фазы, эластичный рост при блокировках. Цена — сложность реализации.
Современная альтернатива — отделить blocking I/O от CPU-bound pool'а: маленький CPU-pool (N_CPU воркеров), отдельный blocking pool (десятки воркеров) для syscall'ов. Так делают Tokio (spawn_blocking) и Java Loom.
Реальные thread pools¶
| Реализация | Особенности |
|---|---|
boost::asio::thread_pool |
FIFO global queue, интеграция с executor framework |
Intel TBB task_scheduler_init |
work-stealing, task DAG, blocking-aware |
| OpenMP runtime | за #pragma omp parallel, общий pool на process |
| libuv thread pool | Node.js: blocking syscalls (fs, dns); по умолчанию 4 потока |
std::async(std::launch::async, …) |
реализация-зависимая: GCC спавнит поток на вызов, MSVC — pool |
Java ForkJoinPool |
work-stealing, compensation threads, parallelStream |
Java Executors.newFixedThreadPool |
классический FIFO, под капотом — LinkedBlockingQueue |
| Go runtime (M:N scheduler) | goroutines на work-stealing pool, нет явного user-facing API |
| Rust Tokio multi-thread runtime | work-stealing + spawn_blocking для I/O |
std::async(std::launch::async) в GCC до сих пор спавнит честный std::thread на каждый вызов — никакого pool'а под капотом. Если нужна реальная пуловая семантика, либо MSVC, либо своя реализация поверх std::packaged_task (как выше). Это поведение implementation-defined, поэтому не на что полагаться в кросс-платформенном коде.
Подводные камни¶
Recursive submit с bounded queue. Воркер выполняет task, та сабмитит подзадачу в тот же pool и ждёт её результат. Если очередь полна и submit блокирует, а все воркеры висят на future.get() — deadlock. Решения: unbounded queue, отдельный pool для подзадач, или work-stealing с inlining (TBB так умеет — если task ждёт sub-task, текущий воркер выполняет sub-task сам, не отдавая в очередь).
Непойманные исключения. Если task() бросает наружу из лямбды воркера и нет try { ... } catch (...), поток std::thread завершится с активным исключением → terminate всего процесса. Поэтому либо submit через packaged_task (исключение уходит в future), либо явный catch в worker loop.
Незавершённый shutdown. Если деструктор pool'а не дождётся join'ов, лямбды воркеров могут продолжить выполняться после смерти захваченных this-ссылок → use-after-free. ~ThreadPool обязан вызвать close() и join() для каждого worker'а.
Stack overflow в task. Стек воркера фиксирован (pthread_attr_setstacksize, по умолчанию 8 MB). Рекурсивная task с глубоким стеком уронит только этот воркер в SIGSEGV — остальные продолжат работу, но pool потеряет один поток и не заметит этого. Health-check на число живых воркеров обычно никто не делает; диагностика приходит из логов в виде «pool deadlocked».
Длинные task блокируют pool. Single long-running task занимает воркер на минуты, остальные задачи ждут. Если N задач длиннее N_workers — pool вообще перестаёт прогрессировать. Стандартная защита — таймауты внутри task, или разделение pool'ов по latency-классам (short vs long).
Mutex-захват в task'е и notify под mutex'ом. Task внутри захватывает тот же mutex, что и BlockingQueue — гарантированный deadlock. На практике это редко случается напрямую, но косвенно через recursive submit + future — постоянно.
Связанные темы¶
- Синхронизация: мьютексы, семафоры, futex — механика mutex, condition variable, futex fast/slow path
- Future/Promise — async submit с возвратом результата через
packaged_task - Atomic операции и memory model — основа для lock-free очередей
- Lock-free структуры — Michael-Scott queue, ABA, hazard pointers
- Потоки (основы) — стоимость
pthread_create, размер стека, TLS
Источники¶
- Anthony Williams, C++ Concurrency in Action, 2nd ed., Chapter 9 — реализация thread pool, work-stealing, task continuations
- Maurice Herlihy, Nir Shavit, The Art of Multiprocessor Programming — формальная модель work-stealing, Chase-Lev deque
- Maged M. Michael, Michael L. Scott, Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms (PODC '96)
- Robert D. Blumofe, Charles E. Leiserson, Scheduling Multithreaded Computations by Work Stealing (FOCS '94)
- Intel TBB documentation — task scheduler, work-stealing
- Boost.Asio source:
thread_pool - moodycamel ConcurrentQueue design notes
man 3 pthread_create,man 3 pthread_cond_wait