Understanding Java BlockingQueue: Concepts, Implementations, and a Custom Example
This article explains the fundamentals of queues and blocking queues, describes how Java's java.util.concurrent.BlockingQueue works, outlines its main methods and common implementations, and provides a complete custom BlockingQueue source code example for multithreaded producer‑consumer scenarios.
An interview scenario introduces the need to understand queues and blocking queues in Java.
1. What is a Queue
A queue is a linear data structure that allows insertion at the rear and removal at the front (FIFO), unlike a stack which is LIFO.
The queue behaves like a line of people: the first element (front) is removed first, and new elements are added at the tail.
2. What is a BlockingQueue?
A blocking queue suspends the consumer when the queue is empty and suspends the producer when the queue is full, implementing the classic producer‑consumer model where threads are blocked until the opposite side can proceed.
3. How to Implement a BlockingQueue
Since Java 5, the java.util.concurrent package provides the BlockingQueue interface and several ready‑made implementations, so developers only need to use the API without handling the low‑level blocking logic themselves.
4. Using BlockingQueue
BlockingQueue is an interface that defines the contract for blocking queues; concrete classes provide the actual behavior.
4.1 Main Methods of BlockingQueue
Insertion
(1) offer(E e) : returns true if the element is added, false if the queue is full (non‑blocking). (2) offer(E e, long timeout, TimeUnit unit) : waits up to the specified timeout when the queue is full. (3) put(E e) : blocks indefinitely until space becomes available.
Retrieval
(1) poll() : returns an element or null if the queue is empty. (2) poll(long timeout, TimeUnit unit) : waits up to the timeout for an element. (3) take() : blocks until an element is available.
4.2 Common Implementations of BlockingQueue
1. ArrayBlockingQueue : a bounded queue backed by an array; uses a single lock, which can limit concurrency. 2. LinkedBlockingQueue : optionally bounded, backed by a linked list; uses two locks, allowing concurrent producers and consumers. 3. DelayQueue : an unbounded priority queue for elements that implement Delayed , releasing elements only after their delay expires. 4. PriorityBlockingQueue : an unbounded priority queue based on a heap; only consumers block, producers never block. 5. SynchronousQueue : has no internal capacity; each insert must wait for a corresponding remove, effectively hand‑off between producer and consumer.
5. Custom BlockingQueue Implementation
The following code demonstrates a simple hand‑crafted blocking queue inspired by ArrayBlockingQueue , using an array, a ReentrantLock, and two Condition objects to manage producer‑consumer coordination.
/**
* @author yz
* @version 1.0
* @date 2020/10/31 11:24
*/
public class YzBlockingQuery {
private Object[] tab; // queue container
private int takeIndex; // index for taking
private int putIndex; // index for putting
private int size; // current size
private ReentrantLock reentrantLock = new ReentrantLock();
private Condition notEmpty; // condition for consumers
private Condition notFull; // condition for producers
public YzBlockingQuery(int tabCount) {
if (tabCount <= 0) {
new NullPointerException();
}
tab = new Object[tabCount];
notEmpty = reentrantLock.newCondition();
notFull = reentrantLock.newCondition();
}
public boolean offer(Object obj) {
if (obj == null) { throw new NullPointerException(); }
try {
reentrantLock.lock();
while (size == tab.length) {
System.out.println("队列已满");
notFull.await();
}
tab[putIndex] = obj;
if (++putIndex == tab.length) { putIndex = 0; }
size++;
notEmpty.signal();
return true;
} catch (Exception e) {
notEmpty.signal();
} finally {
reentrantLock.unlock();
}
return false;
}
public Object take() {
try {
reentrantLock.lock();
while (size == 0) {
System.out.println("队列空了");
notEmpty.await();
}
Object obj = tab[takeIndex];
if (++takeIndex == tab.length) { takeIndex = 0; }
size--;
notFull.signal();
return obj;
} catch (Exception e) {
notFull.signal();
} finally {
reentrantLock.unlock();
}
return null;
}
public static void main(String[] args) {
Random random = new Random(100);
YzBlockingQuery yzBlockingQuery = new YzBlockingQuery(5);
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
yzBlockingQuery.offer(i);
System.out.println("生产者生产了:" + i);
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
Object take = yzBlockingQuery.take();
System.out.println("消费者消费了:" + take);
}
});
thread1.start();
thread2.start();
}
}Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.