Boosting C++ Thread Pool Performance: Queue and Memory Optimizations

This article explains why C++ thread pools often hit bottlenecks in high‑concurrency scenarios and provides practical techniques—such as lock‑granularity tuning, lock‑free queues, object‑pool reuse, and pre‑allocation—to improve task scheduling and memory management without relying on external frameworks.

Deepin Linux
Deepin Linux
Deepin Linux
Boosting C++ Thread Pool Performance: Queue and Memory Optimizations

Thread‑Pool Overview

A C++ thread pool keeps a fixed set of pre‑created worker threads that wait for tasks. Submitting a task enqueues it and wakes a waiting thread, avoiding the overhead of creating and destroying threads for each short‑lived job.

Core Components

Thread queue : std::vector<std::thread> threads; Task queue : typically std::queue<std::function<void()>> tasks; protected by a std::mutex and a std::condition_variable.

Mutex : ensures exclusive access to the task queue.

Condition variable : lets workers block when the queue is empty and wake when new tasks arrive.

Task Submission

template<class F, class... Args>
void ThreadPool::enqueue(F&& f, Args&&... args) {
    auto task = std::make_unique<Task>(std::forward<F>(f), std::forward<Args>(args)...);
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
        tasks.push(std::move(task));
    }
    condition.notify_one();
}

Worker Loop

void ThreadPool::worker() {
    while (true) {
        std::unique_ptr<Task> task;
        {
            std::unique_lock<std;mutex> lock(queueMutex);
            condition.wait(lock, [this]{ return stop || !tasks.empty(); });
            if (stop && tasks.empty()) return;
            task = std::move(tasks.front());
            tasks.pop();
        }
        task->execute();
    }
}

Memory‑Management Techniques

Object‑pool reuse : pre‑allocate a set of objects and recycle them to avoid repeated new/delete.

class ObjectPool {
public:
    ObjectPool(size_t init) { for (size_t i=0;i<init;++i) pool.push(std::make_shared<MyObject>()); }
    std::shared_ptr<MyObject> getObject() { if (pool.empty()) return std::make_shared<MyObject>(); auto obj=pool.front(); pool.pop(); return obj; }
    void returnObject(std::shared_ptr<MyObject> obj) { pool.push(obj); }
private:
    std::queue<std::shared_ptr<MyObject>> pool;
};

Pre‑allocation : reserve capacity for containers to avoid reallocations.

std::vector<int> vec; vec.reserve(10000); for (int i=0;i<10000;++i) vec.push_back(i);

Leak detection : use Valgrind or AddressSanitizer.

valgrind --tool=memcheck --leak-check=yes ./test
g++ -fsanitize=address -g -o test test.cpp

Typical Runtime Scenarios

With three worker threads:

If ≤3 tasks are submitted, each is executed immediately.

If more tasks arrive, excess tasks wait in the queue.

When the queue is unbounded it can grow indefinitely; a bounded queue makes the producer block when full.

Implementation Steps

1. Task class

class Task {
public:
    template<class F, class... Args>
    Task(F&& f, Args&&... args)
        : func(std::bind(std::forward<F>(f), std::forward<Args>(args)...)) {}
    void execute() { if (func) func(); }
private:
    std::function<void()> func;
};

2. Thread‑pool class skeleton

class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();
    template<class F, class... Args> void enqueue(F&& f, Args&&... args);
private:
    void worker();
    std::vector<std::thread> threads;
    std::queue<std::unique_ptr<Task>> tasks;
    std::mutex queueMutex;
    std::condition_variable condition;
    bool stop = false;
};

3. Constructor / Destructor

ThreadPool::ThreadPool(size_t n) : stop(false) {
    for (size_t i=0;i<n;++i) {
        threads.emplace_back([this]{ this->worker(); });
    }
}
ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (auto &th : threads) th.join();
}

4. Enqueue and worker (see sections above)

Full Reference Implementation (C++11)

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>

class ThreadPool {
public:
    explicit ThreadPool(size_t thread_num) : stop(false) {
        for (size_t i=0;i<thread_num;++i) {
            workers.emplace_back([this]{
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(mtx);
                        cv.wait(lock, [this]{ return stop || !tasks.empty(); });
                        if (stop && tasks.empty()) return;
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    task();
                }
            });
        }
    }
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(mtx);
            stop = true;
        }
        cv.notify_all();
        for (auto &w : workers) if (w.joinable()) w.join();
    }
    template<typename F, typename... Args>
    void enqueue(F&& f, Args&&... args) {
        auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        {
            std::unique_lock<std::mutex> lock(mtx);
            if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
            tasks.emplace(std::move(bound));
        }
        cv.notify_one();
    }
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex mtx;
    std::condition_variable cv;
    std::atomic<bool> stop;
};

// Example usage
void test_task(int id, const std::string& msg) {
    std::cout << "Task " << id << ": " << msg
              << " (Thread " << std::this_thread::get_id() << ")" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

int main() {
    ThreadPool pool(4);
    for (int i=0;i<10;++i) pool.enqueue(test_task, i, "Hello ThreadPool");
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 0;
}

Advanced Optimizations (Illustrative)

Replace the protected std::queue with a lock‑free queue (e.g., boost::lockfree::queue) to eliminate lock contention.

Introduce priority scheduling by storing tasks in a std::priority_queue and dispatching higher‑priority jobs first.

Shard the task queue into multiple sub‑queues each protected by its own mutex to reduce global lock contention.

Combine object‑pool reuse and pre‑allocation for frequently created types (e.g., game entities, trade objects) to cut allocation latency and memory jitter.

Practical Guidelines

Profile before optimizing; identify whether the bottleneck is queue contention, memory allocation, or thread oversubscription.

Choose the appropriate optimization based on workload characteristics: lock‑free or sharded queues for extreme concurrency, priority queues for latency‑sensitive tasks.

Maintain clear, modular code; avoid premature over‑optimization.

Validate changes with functional, performance, and stress tests, and run leak detectors (Valgrind, ASan) regularly.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

OptimizationconcurrencyThread PoolC++lock-free queueobject pool
Deepin Linux
Written by

Deepin Linux

Research areas: Windows & Linux platforms, C/C++ backend development, embedded systems and Linux kernel, etc.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.