Unlocking Java’s AbstractQueuedSynchronizer: How AQS Powers Locks and Synchronizers
This article explains the core concepts and implementation details of Java's AbstractQueuedSynchronizer (AQS), covering its three main responsibilities, lock acquisition and release mechanisms, the internal Node structure, and both exclusive and shared lock algorithms with illustrative pseudocode.
AbstractQueuedSynchronizer (AQS) is a framework created by Doug Lea for building locks and other synchronizers in the java.util.concurrent package. Many J.U.C. utilities such as ReentrantLock, ReentrantReadWriteLock, Semaphore, and CountDownLatch rely on AQS.
AQS performs three primary tasks:
Managing synchronization state;
Maintaining a FIFO synchronization queue;
Blocking and waking up threads.
From a behavioral perspective, operations are divided into lock acquisition and lock release, while from a pattern perspective they are divided into exclusive locks and shared locks.
Implementation Principle
AQS internally maintains a FIFO queue to manage locks. A thread first attempts to acquire the lock; if it fails, it wraps itself and its wait status into a Node and inserts it into the synchronization queue, where it will be blocked until the owning thread releases the lock and wakes up its successor.
Lock Acquisition Pseudocode
<code>while (condition not met) {
wrap current thread into node and insert into sync queue
if (needs to block current thread) {
block current thread until awakened
}
remove current thread from sync queue
}
</code>Lock Release Pseudocode
<code>modify sync state
if (new state allows other threads to acquire lock) {
wake up successor thread
}
</code>AQS Core Data Structure: Node (inner class)
<code>/**
* When a shared resource is held by a thread, other threads requesting the resource will block and enter the sync queue.
*/
static final class Node {
/* Marker for a node waiting in shared mode */
static final Node SHARED = new Node();
/* Marker for a node waiting in exclusive mode */
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
volatile Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) throw new NullPointerException();
else return p;
}
Node() {}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
</code>Exclusive Lock Acquisition
<code>public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return node;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
</code>Exclusive Lock Release
<code>public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) {
unparkSuccessor(h);
}
return true;
}
return false;
}
</code>Shared Lock Acquisition
<code>public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) {
doAcquireShared(arg);
}
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted) selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
</code>Shared Lock Release
<code>public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
</code>The entire AQS mechanism revolves around managing a linked list of Node objects that represent waiting threads, using CAS operations to safely insert, remove, and update nodes, while carefully handling cancellation, interruption, and propagation of wake‑up signals for both exclusive and shared synchronization modes.
Sanyou's Java Diary
Passionate about technology, though not great at solving problems; eager to share, never tire of learning!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.