Fundamentals 18 min read

Master Java Thread Communication: wait/notify, Locks, Conditions & BlockingQueue

This article explores Java's thread communication techniques—including the basic wait/notify mechanism, advanced Lock and Condition usage, and the thread‑safe BlockingQueue—providing detailed explanations, producer‑consumer examples, and additional concurrency utilities like CountDownLatch, illustrating how to coordinate threads efficiently in multithreaded applications.

Xuanwu Backend Tech Stack
Xuanwu Backend Tech Stack
Xuanwu Backend Tech Stack
Master Java Thread Communication: wait/notify, Locks, Conditions & BlockingQueue

In multithreaded development, communication between threads is crucial for ensuring data consistency and safety. Java offers several mechanisms to achieve this, each suited to different scenarios.

When multiple threads run concurrently, coordinating their behavior is key to building efficient and reliable concurrent programs.

等待/通知机制(wait/notify)

Java's Object class provides wait() and notify() methods for thread communication. wait() makes the current thread wait until it receives a notify() or notifyAll() signal.

wait() : puts the current thread into a waiting state until another thread calls notify() or notifyAll() to wake it up.

notify() : randomly wakes one thread that is waiting on the object's monitor lock.

notifyAll() : wakes all threads waiting on the object's monitor lock.

Below is a producer‑consumer example using wait/notify:

/**
 * Thread communication – Producer‑Consumer model
 */
public class MessageQueue {
    /** Message queue */
    private final LinkedList<Message> queue = new LinkedList<>();
    /** Queue capacity */
    private final int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }

    /** Produce a message */
    public synchronized void produce(Message message) {
        try {
            while (queue.size() >= capacity) {
                System.out.println("Queue full, producer thread " + Thread.currentThread().getName() + " waiting...");
                wait();
            }
            queue.offer(message);
            System.out.println("Produced message: " + message.getContent() + " queue size: " + queue.size());
            notifyAll();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /** Consume a message */
    public synchronized Message consume() {
        try {
            while (queue.isEmpty()) {
                System.out.println("Queue empty, consumer thread " + Thread.currentThread().getName() + " waiting...");
                wait();
            }
            Message message = queue.poll();
            System.out.println("Consumed message: " + message.getContent() + " queue size: " + queue.size());
            notifyAll();
            return message;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }
}

Test code for the above model:

public class MessageQueueTest {
    public static void main(String[] args) {
        // Create a queue with capacity 3
        MessageQueue messageQueue = new MessageQueue(3);
        ExecutorService producerPool = Executors.newFixedThreadPool(3);
        ExecutorService consumerPool = Executors.newFixedThreadPool(2);
        // Start 3 producer threads
        for (int i = 0; i < 3; i++) {
            final int producerId = i;
            producerPool.submit(() -> {
                try {
                    for (int j = 1; j <= 3; j++) {
                        Message message = createMessage(producerId, j);
                        messageQueue.produce(message);
                        Thread.sleep(100);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // Start 2 consumer threads
        for (int i = 0; i < 2; i++) {
            consumerPool.submit(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        messageQueue.consume();
                        Thread.sleep(500);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // Graceful shutdown logic omitted for brevity
    }
    private static Message createMessage(int producerId, int messageId) {
        Message message = new Message();
        message.setContent("Message from Producer-" + producerId + " ID-" + messageId);
        message.setCreateTime(new Date());
        return message;
    }
}

Lock 和 Condition

In the java.util.concurrent.locks package, Java provides the Lock interface and Condition interface. Compared with wait/notify, they offer more flexible thread communication and precise wake‑up control.

public class OrderProcessor {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition orderReceived = lock.newCondition();
    private final Condition orderProcessed = lock.newCondition();
    private final Queue<Order> orderQueue = new LinkedList<>();
    private volatile boolean hasNewOrder = false;

    public void receiveOrder(Order order) {
        lock.lock();
        try {
            orderQueue.offer(order);
            hasNewOrder = true;
            System.out.println("Received new order: " + order);
            orderReceived.signal();
        } finally {
            lock.unlock();
        }
    }

    public void processOrder() {
        lock.lock();
        try {
            while (!hasNewOrder) {
                orderReceived.await();
            }
            Order order = orderQueue.poll();
            if (order != null) {
                System.out.println("Processing order: " + order);
                Thread.sleep(1000);
                order.setStatus(OrderStatus.PROCESSED);
                orderProcessed.signal();
            }
            hasNewOrder = !orderQueue.isEmpty();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }
}
orderReceived.signal() : wakes the processing thread that is waiting for a new order. orderProcessed.signal() : notifies threads waiting for the order processing result (e.g., shipping).
public class OrderProcessorTest {
    public static void main(String[] args) {
        OrderProcessor processor = new OrderProcessor();
        ExecutorService orderCreators = Executors.newFixedThreadPool(3);
        ExecutorService orderProcessors = Executors.newFixedThreadPool(2);
        // Simulate order creation
        for (int i = 0; i < 5; i++) {
            orderCreators.submit(() -> {
                try {
                    Order order = createRandomOrder();
                    processor.receiveOrder(order);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // Start processing threads
        for (int i = 0; i < 2; i++) {
            orderProcessors.submit(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        processor.processOrder();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // Graceful shutdown omitted for brevity
    }
    private static Order createRandomOrder() {
        Order order = new Order();
        order.setOrderId(UUID.randomUUID().toString().substring(0, 8));
        order.setStatus(OrderStatus.NEW);
        order.setCreateTime(new Date());
        order.setAmount(new BigDecimal(String.format("%.2f", 100 + Math.random() * 900)));
        return order;
    }
    private static void gracefulShutdown(ExecutorService... pools) {
        for (ExecutorService pool : pools) {
            pool.shutdown();
            try {
                if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
                    pool.shutdownNow();
                }
            } catch (InterruptedException e) {
                pool.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        System.out.println("All orders processed");
    }
}

BlockingQueue

BlockingQueue

is a thread‑safe queue that requires no additional synchronization, making it ideal for producer‑consumer scenarios. Below is a log‑processing example.

public class LogProcessor {
    private final BlockingQueue<LogInfo> logQueue = new LinkedBlockingQueue<>(1000);
    private final ExecutorService executorService = Executors.newFixedThreadPool(3);
    private volatile boolean running = true;

    public void start() {
        for (int i = 0; i < 3; i++) {
            executorService.submit(this::processLogs);
        }
    }

    public void submitLog(LogInfo logEvent) {
        try {
            logQueue.put(logEvent);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void processLogs() {
        while (running) {
            try {
                LogInfo logEvent = logQueue.take();
                processLogEvent(logEvent);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    private void processLogEvent(LogInfo logInfo) {
        switch (logInfo.getLevel()) {
            case ERROR:
                System.err.println("Urgent alert: " + logInfo);
                sendAlert(logInfo);
                break;
            case WARN:
                System.out.println("Warning stats: " + logInfo);
                aggregateWarning(logInfo);
                break;
            default:
                System.out.println("Normal log: " + logInfo);
                saveLog(logInfo);
        }
    }

    private void sendAlert(LogInfo logInfo) {
        System.out.println("====> Sending alert email: " + logInfo.getMessage());
    }

    private void aggregateWarning(LogInfo logInfo) {
        System.out.println("====> Adding to warning stats: " + logInfo.getMessage());
    }

    private void saveLog(LogInfo logInfo) {
        System.out.println("====> Saving log to storage: " + logInfo.getMessage());
    }
}
public class LogProcessorTest {
    public static void main(String[] args) {
        LogProcessor logProcessor = new LogProcessor();
        logProcessor.start();
        ExecutorService producerPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 20; i++) {
            final int serviceId = i % 5;
            producerPool.submit(() -> {
                try {
                    LogInfo logInfo = generateRandomLog(serviceId);
                    logProcessor.submitLog(logInfo);
                    System.out.println("Service " + serviceId + " generated log: " + logInfo);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        shutdownAndAwaitTermination(producerPool);
        // Additional shutdown logic omitted for brevity
    }
    private static LogInfo generateRandomLog(int serviceId) {
        LogInfo logInfo = new LogInfo();
        int randomLevel = (int) (Math.random() * 100);
        LogLevel level = randomLevel < 10 ? LogLevel.ERROR : (randomLevel < 30 ? LogLevel.WARN : LogLevel.INFO);
        logInfo.setLevel(level);
        logInfo.setServiceName("Service-" + serviceId);
        logInfo.setMessage(generateLogMessage(level, serviceId));
        logInfo.setTimestamp(new Date());
        return logInfo;
    }
    private static String generateLogMessage(LogLevel level, int serviceId) {
        switch (level) {
            case ERROR:
                return String.format("Service %d error: ERROR-%d", serviceId, (int) (Math.random() * 1000));
            case WARN:
                return String.format("Service %d warning: WARN-%d", serviceId, (int) (Math.random() * 1000));
            default:
                return String.format("Service %d processing: INFO-%d", serviceId, (int) (Math.random() * 1000));
        }
    }
}

并发工具类

Java also provides higher‑level concurrency utilities such as CountDownLatch, Semaphore, and CyclicBarrier. Below is an example using CountDownLatch for system initialization.

public class SystemInitializer {
    private final CountDownLatch startupLatch;
    private final List<InitializationTask> initTasks;

    public SystemInitializer(List<InitializationTask> tasks) {
        this.initTasks = tasks;
        this.startupLatch = new CountDownLatch(tasks.size());
    }

    public void initialize() {
        ExecutorService executor = Executors.newFixedThreadPool(initTasks.size());
        for (InitializationTask task : initTasks) {
            executor.submit(() -> {
                try {
                    System.out.println("Starting init: " + task.getName());
                    task.initialize();
                    System.out.println("Init completed: " + task.getName());
                } catch (Exception e) {
                    // log error omitted
                } finally {
                    startupLatch.countDown();
                }
            });
        }
        try {
            boolean allCompleted = startupLatch.await(60, TimeUnit.SECONDS);
            if (!allCompleted) {
                throw new TimeoutException("System initialization timed out");
            }
            System.out.println("System initialization completed");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("System initialization interrupted");
        } catch (TimeoutException e) {
            throw new RuntimeException("System initialization timed out");
        } finally {
            executor.shutdown();
        }
    }
}
interface InitializationTask {
    String getName();
    void initialize() throws Exception;
}

总结

By using appropriate thread‑communication mechanisms, we can ensure that multiple threads cooperate correctly and avoid resource contention. Java provides several options—wait/notify, Lock/Condition, BlockingQueue, and higher‑level utilities like CountDownLatch—each suited to different scenarios, and developers should choose the one that best fits their specific requirements.

wait/notify : suitable for simple synchronization scenarios.

Lock and Condition : offers more flexible thread control.

BlockingQueue : a naturally thread‑safe queue ideal for producer‑consumer patterns.

CountDownLatch, Semaphore, CyclicBarrier : useful for coordinating multiple threads.

背诵版

Java中线程间的通信方式主要有以下几种:

共享内存 :通过共享变量(使用 synchronizedvolatile 等)实现线程间的数据交换,需要注意同步和并发问题。

等待/通知机制(wait/notify) :使用 wait() 让线程进入等待状态, notifyAll() 唤醒等待线程,必须在同步块中使用。

Lock 和 Condition :提供比 wait/notify 更灵活的通知机制,可实现精准唤醒。

阻塞队列 : BlockingQueue 及其实现类(如 ArrayBlockingQueueLinkedBlockingQueue )提供线程安全的队列,简化生产者‑消费者协作。

并发工具类 : java.util.concurrent 包中的 CountDownLatchSemaphoreExchangerCyclicBarrier 等进一步简化线程间的通信和同步。

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaconcurrencyLockThread CommunicationConditionCountDownLatchBlockingQueuewait/notify
Xuanwu Backend Tech Stack
Written by

Xuanwu Backend Tech Stack

Primarily covers fundamental Java concepts, mainstream frameworks, deep dives into underlying principles, and JVM internals.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.