Deep Dive into ThreadPoolExecutor: Principles, State Control, and Source Code Analysis
This article provides a comprehensive analysis of Java's ThreadPoolExecutor, covering its underlying JUC synchronizer framework, core and non‑core thread handling, state management, key configuration parameters, and detailed walkthrough of critical source‑code methods such as execute, addWorker, and runWorker, supplemented with illustrative code examples.
Earlier I planned to study the source of ThreadPoolExecutor but was busy, so I now present a thorough analysis of its implementation, focusing on the execute method and related mechanisms.
ThreadPoolExecutor Fundamentals
ThreadPoolExecutor relies on the JUC synchronizer framework ( AbstractQueuedSynchronizer , commonly called AQS), extensive bit‑wise operations, and CAS. It provides core (fixed‑size) threads, optional non‑core threads (pool size minus core size), a task queue, and rejection policies.
JUC Synchronizer Framework
Global lock mainLock (a ReentrantLock ) protects access to the worker set and statistics.
Condition variable termination supports awaitTermination() .
Task queue workQueue is a BlockingQueue that holds pending Runnable s.
Internal class Worker represents the actual thread objects.
Core Threads
A simplified pool with only core threads can be built as follows:
public class CoreThreadPool implements Executor {
private BlockingQueue<Runnable> workQueue;
private static final AtomicInteger COUNTER = new AtomicInteger();
private int coreSize;
private int threadCount = 0;
public CoreThreadPool(int coreSize) {
this.coreSize = coreSize;
this.workQueue = new LinkedBlockingQueue<>();
}
@Override
public void execute(Runnable command) {
if (++threadCount <= coreSize) {
new Worker(command).start();
} else {
try { workQueue.put(command); }
catch (InterruptedException e) { throw new IllegalStateException(e); }
}
}
// Worker and main method omitted for brevity
}Running this example prints thread names and task indices, demonstrating lazy creation of core threads and blocking on take() when the queue is empty.
Additional Features
When constructing a full ThreadPoolExecutor , you specify corePoolSize , maximumPoolSize , keepAliveTime , a ThreadFactory , and a RejectedExecutionHandler . If the queue is bounded and both core and non‑core threads are saturated, extra threads up to maximumPoolSize - corePoolSize are created. Rejection policies include AbortPolicy , DiscardPolicy , DiscardOldestPolicy , and CallerRunsPolicy .
Source Code Analysis
Key fields of ThreadPoolExecutor (JDK 11) include:
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private final BlockingQueue<Runnable> workQueue;
private final HashSet<Worker> workers = new HashSet<>();
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
// other code omitted
}The longest constructor allows full customization:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// validation omitted
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}Important methods:
execute(Runnable)
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) reject(command);
else if (workerCountOf(recheck) == 0) addWorker(null, false);
} else if (!addWorker(command, false))
reject(command);
}The flow is: try to create a core thread, otherwise enqueue the task, perform a second state check, possibly create a non‑core thread, and finally apply the rejection policy if everything fails.
addWorker(Runnable, boolean)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
if (runStateAtLeast(c, SHUTDOWN) &&
(runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c)) break retry;
c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN)) continue retry;
}
}
// create Worker, lock mainLock, add to workers set, start thread, etc.
// omitted for brevity
}The method performs intricate state checks, CAS updates of the worker count, and, under the protection of mainLock , adds the new Worker to the set before starting its thread.
Worker Inner Class
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() { runWorker(this); }
// AQS lock methods omitted
void interruptIfStarted() { /* interrupt logic */ }
}Each Worker is both a Runnable (run by the thread) and an AQS synchronizer that provides a simple exclusive lock used by the pool to control thread interruption.
runWorker(Worker)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try { task.run(); afterExecute(task, null); }
catch (Throwable ex) { afterExecute(task, ex); throw ex; }
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}The worker repeatedly fetches tasks via getTask() , handles interruption based on pool state, executes the task with beforeExecute / afterExecute hooks, and finally calls processWorkerExit when the loop ends.
State Control
The pool state and worker count are packed into a single int ctl . The high 3 bits store the run state ( RUNNING , SHUTDOWN , STOP , TIDYING , TERMINATED ), and the low 29 bits store the worker count. Helper methods such as runStateOf , workerCountOf , runStateLessThan , and isRunning extract or compare these fields using bitwise operations and CAS.
State transition diagram (omitted image) shows the progression from RUNNING → SHUTDOWN → STOP → TIDYING → TERMINATED.
Overall, the design balances low‑level concurrency primitives (AQS, CAS, ReentrantLock) with high‑level policies to provide a flexible, high‑performance thread pool.
Source: http://8rr.co/2348
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.