Master Java’s Concurrent Containers: Deep Dive into ConcurrentHashMap and Queues
This article provides an in‑depth exploration of Java’s concurrent collection classes, covering the design and implementation of ConcurrentHashMap, ConcurrentLinkedQueue, and the seven blocking queue variants, complete with code analysis, performance considerations, and practical usage scenarios for high‑throughput multithreaded applications.
ConcurrentHashMap Usage and Principles
This chapter introduces the core concepts of ConcurrentHashMap , a thread‑safe hash table introduced in JDK 1.5. It replaces the non‑concurrent HashMap and Hashtable for high‑concurrency scenarios.
Typical usage includes creating a counter that updates a map atomically:
<code>public class Counter {
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
public void increase(String key) {
map.compute(key, (k, v) -> (v == null) ? 1 : v + 1);
}
public int get(String key) {
return map.getOrDefault(key, 0);
}
}</code>The compute method guarantees atomic read‑modify‑write semantics, eliminating explicit synchronization.
Implementation Overview (JDK 8)
Internally ConcurrentHashMap maintains an array of Node<K,V> objects. Each node stores the hash, key, value, and a volatile next reference for chaining. The map uses a combination of CAS operations and synchronized blocks to achieve lock‑striping and non‑blocking reads.
Key fields:
<code>static final int MAXIMUM_CAPACITY = 1 << 30;
static final int DEFAULT_CAPACITY = 16;
static final int LOAD_FACTOR = 0.75f;
transient volatile Node<K,V>[] table;
transient volatile int sizeCtl;</code>sizeCtl encodes the state of the table: 0 means uninitialized, a positive value is the desired capacity, -1 indicates initialization in progress, and values less than -1 represent ongoing resizing.
Put Operation
The putVal method first computes the hash, then attempts to insert the node. If the target bucket is empty, a CAS inserts the node without locking. If a collision occurs, the bucket is locked and the chain is traversed or a red‑black tree is created when the list exceeds TREEIFY_THRESHOLD (8).
<code>public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// hash calculation
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
if (tab == null || (n = tab.length) == 0)
tab = initTable();
Node<K,V> f = tabAt(tab, i = (n - 1) & hash);
if (f == null) {
if (casTabAt(tab, i, null, new Node<>(hash, key, value, null)))
break; // no lock needed
} else if (f.hash == MOVED) {
tab = helpTransfer(tab, f);
} else {
synchronized (f) {
if (tabAt(tab, i) == f) {
// handle linked list or tree
}
}
}
}
addCount(1L, binCount);
return null;
}</code>Get Operation
The get method performs a lock‑free read. It first checks the first node of the bucket; if the hash matches, the value is returned. If the node is a forwarding node during resize, the method follows the nextTable . Otherwise it traverses the linked list or tree.
<code>public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
} else if (eh < 0)
return e.find(h, key).val;
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}</code>Resize Mechanism
When the number of entries exceeds threshold = capacity * loadFactor , the map doubles its table size. Resizing is performed by multiple threads cooperatively. Each thread claims a range of buckets (the transferIndex ) and moves entries to the new table, using ForwardingNode markers to indicate that a bucket has been transferred.
<code>private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// calculate stride based on CPU count
// each thread repeatedly claims a range and moves nodes
// after all buckets are moved, replace table with nextTab
}</code>ConcurrentLinkedQueue Usage and Principles
ConcurrentLinkedQueue is an unbounded, lock‑free, thread‑safe FIFO queue based on a singly‑linked list. It uses Unsafe CAS operations to manipulate next pointers, providing high throughput for producer‑consumer patterns.
Node structure:
<code>static final class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) { ITEM.set(this, item); }
Node() {}
void appendRelaxed(Node<E> next) { NEXT.set(this, next); }
boolean casItem(E cmp, E val) { return ITEM.compareAndSet(this, cmp, val); }
}</code>Offer (Enqueue)
The offer method appends a new node at the tail using a CAS loop. The tail pointer is updated lazily to reduce contention.
<code>public boolean offer(E e) {
final Node<E> newNode = new Node<>(Objects.requireNonNull(e));
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
if (casNext(p, null, newNode)) {
if (p != t)
TAIL.weakCompareAndSet(this, t, newNode);
return true;
}
} else if (p == q) {
p = (t = tail) != null ? t : head;
} else {
p = (p != t && t != (t = tail)) ? t : q;
}
}
}</code>Poll (Dequeue)
The poll method removes the head node. It first marks the item as null using CAS, then advances the head pointer.
<code>public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item;
if ((item = p.item) != null && p.casItem(item, null)) {
if (p != h)
updateHead(h, (q = p.next) != null ? q : p);
return item;
} else if ((q = p.next) == null) {
updateHead(h, p);
return null;
} else if (p == q) {
continue restartFromHead;
}
}
}
}</code>Java Blocking Queues (7 Variants)
JDK 1.8 provides seven implementations of BlockingQueue , each suited to different concurrency patterns.
ArrayBlockingQueue – fixed‑size array, uses a single ReentrantLock and two Condition objects ( notEmpty , notFull ).
LinkedBlockingQueue – optionally bounded linked list, employs separate locks for put and take to reduce contention.
PriorityBlockingQueue – unbounded priority heap, single lock, no blocking on put .
DelayQueue – unbounded queue of Delayed elements, ordered by remaining delay; take blocks until the head element’s delay expires.
SynchronousQueue – zero‑capacity queue; each put must wait for a matching take . Implemented with either a transfer stack (fair) or transfer queue (non‑fair).
LinkedTransferQueue – unbounded linked queue that supports transfer operations allowing producers to wait for consumers.
LinkedBlockingDeque – double‑ended version of LinkedBlockingQueue , supporting insertion and removal at both ends.
ArrayBlockingQueue
Backed by a fixed array, it uses a single lock and two conditions. Elements are stored in a circular buffer indexed by takeIndex and putIndex .
<code>public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
count++;
if (count == 1)
notEmpty.signal();
} finally {
lock.unlock();
}
}</code>LinkedBlockingQueue
Uses a singly‑linked list with separate putLock and takeLock . This allows concurrent producers and consumers without interfering with each other.
<code>public void put(E e) throws InterruptedException {
final Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity)
notFull.await();
enqueue(node);
int c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}</code>PriorityBlockingQueue
Implemented as a binary heap stored in an array. Elements are ordered according to their natural ordering or a supplied Comparator . The queue is unbounded, so put never blocks; take blocks only when the queue is empty.
<code>public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
queue.offer(e);
if (queue.peek() == e) {
leader = null;
notEmpty.signal();
}
return true;
} finally {
lock.unlock();
}
}</code>DelayQueue
Wraps a PriorityQueue of Delayed elements. The head element’s delay determines how long take blocks.
<code>public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
notEmpty.await();
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
if (leader != null)
notEmpty.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
notEmpty.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
notEmpty.signal();
lock.unlock();
}
}</code>SynchronousQueue
A zero‑capacity queue where each put must wait for a corresponding take . The non‑fair implementation uses a lock‑free transfer stack; the fair version uses a transfer queue.
<code>public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null) return e;
Thread.interrupted();
throw new InterruptedException();
}</code>LinkedTransferQueue
Combines the features of LinkedBlockingQueue and SynchronousQueue . Producers can either enqueue data or wait for a consumer using transfer . Consumers can similarly wait for a producer.
<code>// Simplified transfer logic (non‑fair)
E transfer(E e, boolean timed, long nanos) {
// if a matching node exists, fulfill it; otherwise enqueue a request node and wait
}
</code>LinkedBlockingDeque
A double‑ended version of LinkedBlockingQueue . It supports insertion and removal at both the head and tail, which reduces contention in producer‑consumer scenarios.
<code>public void addFirst(E e) throws InterruptedException { putFirst(e); }
public void addLast(E e) throws InterruptedException { putLast(e); }
public E takeFirst() throws InterruptedException { return take(); }
public E takeLast() throws InterruptedException { return pollLast(); }
</code>Choosing the Right Queue
Typical selections:
Network request buffering – ArrayBlockingQueue (bounded, predictable memory).
Task scheduling with priorities – PriorityBlockingQueue .
Direct hand‑off between threads – SynchronousQueue .
High‑throughput logging or I/O decoupling – LinkedBlockingQueue (unbounded or bounded as needed).
Performance tips include monitoring queue size via JMX, using appropriate rejection policies for thread pools, and dynamically adjusting capacities for bounded queues.
Sanyou's Java Diary
Passionate about technology, though not great at solving problems; eager to share, never tire of learning!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.