Unlocking Java’s PriorityBlockingQueue: How Binary Heap Powers Priority Queues
This article explains how Java’s PriorityBlockingQueue implements a priority‑based unbounded blocking queue using a binary heap, covering heap structure, insertion and removal algorithms, underlying ReentrantLock synchronization, and key source code snippets that illustrate the sift‑up, sift‑down, and growth mechanisms.
We know that a Thread can call setPriority(int newPriority) to set its priority, with higher‑priority threads executing first. Traditional blocking queues like ArrayBlockingQueue and LinkedBlockingQueue follow FIFO ordering, but a priority‑based queue is needed.
PriorityBlockingQueue is an unbounded blocking queue that supports element priority. By default, elements are ordered in natural ascending order, though a custom Comparator can be supplied. Elements with the same priority are not guaranteed to retain FIFO order.
Binary Heap
A binary heap is a special heap implemented as a complete (or nearly complete) binary tree that satisfies both the structural property of a complete binary tree and the heap‑order property. The heap can be a max‑heap (parent ≥ children) or a min‑heap (parent ≤ children).
Binary heaps are usually stored in an array: for a node at index n, the left child is at 2*n+1, the right child at 2*(n+1), and the parent at (n-1)/2.
Add Element
To add an element, it is first placed at the end of the array (the "hole"). If the new element violates the heap order, it is repeatedly swapped with its parent until the heap property is restored.
Example: inserting 2 into a min‑heap.
Step 1 – Append 2 at the end.
Step 2 – 2 is smaller than its parent 6; swap them.
Step 3 – Continue comparing with the new parent 5; swap again.
Step 4 – The element reaches the root; insertion is complete. The overall process is: append at the end, then "sift up" until the heap order is satisfied.
Remove Element
Removal mirrors insertion. The root element (index 0) is removed, the last element fills the hole, and then it is repeatedly swapped with the smaller of its two children until the heap order is restored.
Step 1 – Remove the root 1; move the last element 6 to the root.
Step 2 – Compare 6 with its children 2 and 3; the smaller child 2 moves up.
Step 3 – Continue with children 5 and 7; the smaller 5 moves up.
Step 4 – Finally compare with child 8; no further swap is needed. Removal is finished.
PriorityBlockingQueue
PriorityBlockingQueue extends AbstractQueue and implements BlockingQueue and Serializable.
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.SerializableKey fields (attributes are stripped):
private static final int DEFAULT_INITIAL_CAPACITY = 11; private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private transient Object[] queue; private transient int size; private transient Comparator<? super E> comparator; private final ReentrantLock lock; private final Condition notEmpty; private volatile int allocationSpinLock; private PriorityQueue<E> q;The queue uses a single ReentrantLock and one Condition because it is unbounded; insertion never blocks unless the system runs out of resources.
Enqueue (put / add / offer)
The put(E e) method simply delegates to offer(e), which never blocks.
public void put(E e) {
offer(e); // never need to block
}The offer(E e) method validates the element, acquires the lock, grows the internal array if needed, and then performs a sift‑up operation using either natural ordering or the supplied comparator.
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
// grow if necessary, then sift up
if (comparator == null)
siftUpComparable(size, e, queue);
else
siftUpUsingComparator(size, e, queue, comparator);
size++;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}Sift‑up with natural ordering
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0) break;
array[k] = e;
k = parent;
}
array[k] = key;
}Sift‑up with a comparator
private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0) break;
array[k] = e;
k = parent;
}
array[k] = x;
}Array growth (tryGrow)
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // use spin lock for growth
Object[] newArray = null;
if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >>> 1));
if (newCap - MAX_ARRAY_SIZE > 0) {
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array) {
newArray = new Object[newCap];
}
} finally {
allocationSpinLock = 0; // release spin lock
}
}
if (newArray == null) Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
System.arraycopy(array, 0, newArray, 0, oldCap);
queue = newArray;
}
// lock will be released by caller
}Dequeue (poll / remove)
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}The internal dequeue() method removes the root, moves the last element to the root, and then sifts it down.
private E dequeue() {
int n = size - 1;
if (n < 0) return null;
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}Sift‑down with natural ordering
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>) x;
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = array[child];
int right = child + 1;
if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0) break;
array[k] = c;
k = child;
}
array[k] = key;
}
}Sift‑down with a comparator
private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = array[child];
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
if (cmp.compare(x, (T) c) <= 0) break;
array[k] = c;
k = child;
}
array[k] = x;
}
}In summary, PriorityBlockingQueue relies on a binary heap to maintain priority ordering. Adding elements performs a "sift‑up" operation, while removal performs a "sift‑down". Because the queue is unbounded, insertion never blocks unless the JVM runs out of memory.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
