Understanding LinkedBlockingQueue: Implementation, Principles, and Usage
This article provides a comprehensive overview of Java's LinkedBlockingQueue, covering its FIFO behavior, internal linked‑list structure, concurrency controls with separate put and take locks, detailed method implementations, and a multithreaded example demonstrating safe usage versus non‑thread‑safe alternatives.
LinkedBlockingQueue is a thread‑safe, FIFO (first‑in‑first‑out) blocking queue implemented with a singly linked list. It can be created with an optional capacity (default is Integer.MAX_VALUE) and supports concurrent producers and consumers.
1. Introduction
LinkedBlockingQueue sorts elements FIFO, inserting new elements at the tail and removing from the head. Its throughput is usually higher than array‑based queues, though its performance can be less predictable under heavy contention.
It also supports an optional capacity to prevent unbounded growth; if not specified, the capacity defaults to Integer.MAX_VALUE.
2. Principles and Data Structure
The queue consists of a linked list with a dummy head node. Key fields include: head: reference to the dummy head node. last: reference to the tail node. count: current number of elements (an AtomicInteger). capacity: maximum allowed size. putLock and takeLock: separate locks for insertion and removal. notEmpty and notFull: conditions associated with the two locks.
Insertion uses putLock and signals notEmpty when the queue transitions from empty to non‑empty. Removal uses takeLock and signals notFull when the queue transitions from full to not‑full.
3. Method Summary
LinkedBlockingQueue()
LinkedBlockingQueue(Collection<? extends E> c)
LinkedBlockingQueue(int capacity)
void clear()
int drainTo(Collection<? super E> c)
int drainTo(Collection<? super E> c, int maxElements)
Iterator<E> iterator()
boolean offer(E e)
bool offer(E e, long timeout, TimeUnit unit)
E peek()
E poll()
E poll(long timeout, TimeUnit unit)
bool put(E e)
int remainingCapacity()
bool remove(Object o)
int size()
E take() throws InterruptedException
Object[] toArray()
<T> T[] toArray(T[] a)
String toString()4. Source Code Analysis
Creation
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}Key fields are declared as:
private final int capacity;
private final AtomicInteger count = new AtomicInteger(0);
private transient Node<E> head; // dummy head
private transient Node<E> last; // tail
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();The node class:
static class Node<E> {
E item; // data
Node<E> next; // next pointer
Node(E x) { item = x; }
}Insertion (offer)
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
if (count.get() == capacity) return false;
Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
int c = count.getAndIncrement();
if (c + 1 < capacity) notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0) signalNotEmpty();
return c >= 0;
}Removal (take)
public E take() throws InterruptedException {
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) notEmpty.await();
E x = dequeue();
int c = count.getAndDecrement();
if (c > 1) notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) signalNotFull();
return x;
}Helper methods enqueue, dequeue, signalNotEmpty, and signalNotFull manage the linked list pointers and condition signalling.
5. Traversal
The iterator acquires both locks to provide a snapshot‑consistent view. It returns an inner class Itr that walks the linked list safely.
public Iterator<E> iterator() { return new Itr(); }The Itr class holds references to the current node, last returned node, and current element, and implements hasNext, next, and remove with full locking.
6. Example Usage
The following demo creates a shared LinkedBlockingQueue<String>, starts two threads that each add six elements and print the whole queue after each insertion. When using LinkedBlockingQueue the program runs correctly; replacing it with LinkedList causes a ConcurrentModificationException because LinkedList is not thread‑safe.
import java.util.*;
import java.util.concurrent.*;
public class LinkedBlockingQueueDemo1 {
private static Queue<String> queue = new LinkedBlockingQueue<>();
public static void main(String[] args) {
new MyThread("ta").start();
new MyThread("tb").start();
}
private static void printAll() {
Iterator<String> iter = queue.iterator();
while (iter.hasNext()) {
System.out.print(iter.next() + ", ");
}
System.out.println();
}
private static class MyThread extends Thread {
MyThread(String name) { super(name); }
@Override
public void run() {
for (int i = 1; i <= 6; i++) {
String val = Thread.currentThread().getName() + i;
queue.add(val);
printAll();
}
}
}
}Sample output shows interleaved additions from both threads, confirming that LinkedBlockingQueue handles concurrent modifications safely.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
